PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
matview.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  * materialized view support
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/multixact.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_operator.h"
25 #include "commands/cluster.h"
26 #include "commands/matview.h"
27 #include "commands/tablecmds.h"
28 #include "commands/tablespace.h"
29 #include "executor/executor.h"
30 #include "executor/spi.h"
31 #include "miscadmin.h"
32 #include "parser/parse_relation.h"
33 #include "rewrite/rewriteHandler.h"
34 #include "storage/lmgr.h"
35 #include "storage/smgr.h"
36 #include "tcop/tcopprot.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39 #include "utils/rel.h"
40 #include "utils/snapmgr.h"
41 #include "utils/syscache.h"
42 #include "utils/typcache.h"
43 
44 
45 typedef struct
46 {
47  DestReceiver pub; /* publicly-known function pointers */
48  Oid transientoid; /* OID of new heap into which to store */
49  /* These fields are filled by transientrel_startup: */
50  Relation transientrel; /* relation to write to */
51  CommandId output_cid; /* cmin to insert in output tuples */
52  int hi_options; /* heap_insert performance options */
53  BulkInsertState bistate; /* bulk insert state */
55 
57 
58 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60 static void transientrel_shutdown(DestReceiver *self);
61 static void transientrel_destroy(DestReceiver *self);
62 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
63  const char *queryString);
64 
65 static char *make_temptable_name_n(char *tempname, int n);
66 static void mv_GenerateOper(StringInfo buf, Oid opoid);
67 
68 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
69  int save_sec_context);
70 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
71 
72 static void OpenMatViewIncrementalMaintenance(void);
73 static void CloseMatViewIncrementalMaintenance(void);
74 
75 /*
76  * SetMatViewPopulatedState
77  * Mark a materialized view as populated, or not.
78  *
79  * NOTE: caller must be holding an appropriate lock on the relation.
80  */
81 void
83 {
84  Relation pgrel;
85  HeapTuple tuple;
86 
87  Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
88 
89  /*
90  * Update relation's pg_class entry. Crucial side-effect: other backends
91  * (and this one too!) are sent SI message to make them rebuild relcache
92  * entries.
93  */
97  if (!HeapTupleIsValid(tuple))
98  elog(ERROR, "cache lookup failed for relation %u",
99  RelationGetRelid(relation));
100 
101  ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
102 
103  CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
104 
105  heap_freetuple(tuple);
107 
108  /*
109  * Advance command counter to make the updated pg_class row locally
110  * visible.
111  */
113 }
114 
115 /*
116  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
117  *
118  * This refreshes the materialized view by creating a new table and swapping
119  * the relfilenodes of the new table and the old materialized view, so the OID
120  * of the original materialized view is preserved. Thus we do not lose GRANT
121  * nor references to this materialized view.
122  *
123  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
124  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
125  * statement associated with the materialized view. The statement node's
126  * skipData field shows whether the clause was used.
127  *
128  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
129  * the new heap, it's better to create the indexes afterwards than to fill them
130  * incrementally while we load.
131  *
132  * The matview's "populated" state is changed based on whether the contents
133  * reflect the result set of the materialized view's query.
134  */
136 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
137  ParamListInfo params, char *completionTag)
138 {
139  Oid matviewOid;
140  Relation matviewRel;
141  RewriteRule *rule;
142  List *actions;
143  Query *dataQuery;
144  Oid tableSpace;
145  Oid relowner;
146  Oid OIDNewHeap;
147  DestReceiver *dest;
148  bool concurrent;
149  LOCKMODE lockmode;
150  char relpersistence;
151  Oid save_userid;
152  int save_sec_context;
153  int save_nestlevel;
154  ObjectAddress address;
155 
156  /* Determine strength of lock needed. */
157  concurrent = stmt->concurrent;
158  lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
159 
160  /*
161  * Get a lock until end of transaction.
162  */
163  matviewOid = RangeVarGetRelidExtended(stmt->relation,
164  lockmode, false, false,
166  matviewRel = heap_open(matviewOid, NoLock);
167 
168  /* Make sure it is a materialized view. */
169  if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
170  ereport(ERROR,
171  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
172  errmsg("\"%s\" is not a materialized view",
173  RelationGetRelationName(matviewRel))));
174 
175  /* Check that CONCURRENTLY is not specified if not populated. */
176  if (concurrent && !RelationIsPopulated(matviewRel))
177  ereport(ERROR,
178  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
180 
181  /* Check that conflicting options have not been specified. */
182  if (concurrent && stmt->skipData)
183  ereport(ERROR,
184  (errcode(ERRCODE_SYNTAX_ERROR),
185  errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
186 
187  /* We don't allow an oid column for a materialized view. */
188  Assert(!matviewRel->rd_rel->relhasoids);
189 
190  /*
191  * Check that everything is correct for a refresh. Problems at this point
192  * are internal errors, so elog is sufficient.
193  */
194  if (matviewRel->rd_rel->relhasrules == false ||
195  matviewRel->rd_rules->numLocks < 1)
196  elog(ERROR,
197  "materialized view \"%s\" is missing rewrite information",
198  RelationGetRelationName(matviewRel));
199 
200  if (matviewRel->rd_rules->numLocks > 1)
201  elog(ERROR,
202  "materialized view \"%s\" has too many rules",
203  RelationGetRelationName(matviewRel));
204 
205  rule = matviewRel->rd_rules->rules[0];
206  if (rule->event != CMD_SELECT || !(rule->isInstead))
207  elog(ERROR,
208  "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
209  RelationGetRelationName(matviewRel));
210 
211  actions = rule->actions;
212  if (list_length(actions) != 1)
213  elog(ERROR,
214  "the rule for materialized view \"%s\" is not a single action",
215  RelationGetRelationName(matviewRel));
216 
217  /*
218  * Check that there is a unique index with no WHERE clause on one or more
219  * columns of the materialized view if CONCURRENTLY is specified.
220  */
221  if (concurrent)
222  {
223  List *indexoidlist = RelationGetIndexList(matviewRel);
224  ListCell *indexoidscan;
225  bool hasUniqueIndex = false;
226 
227  foreach(indexoidscan, indexoidlist)
228  {
229  Oid indexoid = lfirst_oid(indexoidscan);
230  Relation indexRel;
231  Form_pg_index indexStruct;
232 
233  indexRel = index_open(indexoid, AccessShareLock);
234  indexStruct = indexRel->rd_index;
235 
236  if (indexStruct->indisunique &&
237  IndexIsValid(indexStruct) &&
238  RelationGetIndexExpressions(indexRel) == NIL &&
239  RelationGetIndexPredicate(indexRel) == NIL &&
240  indexStruct->indnatts > 0)
241  {
242  hasUniqueIndex = true;
243  index_close(indexRel, AccessShareLock);
244  break;
245  }
246 
247  index_close(indexRel, AccessShareLock);
248  }
249 
250  list_free(indexoidlist);
251 
252  if (!hasUniqueIndex)
253  ereport(ERROR,
254  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
255  errmsg("cannot refresh materialized view \"%s\" concurrently",
257  RelationGetRelationName(matviewRel))),
258  errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
259  }
260 
261  /*
262  * The stored query was rewritten at the time of the MV definition, but
263  * has not been scribbled on by the planner.
264  */
265  dataQuery = castNode(Query, linitial(actions));
266 
267  /*
268  * Check for active uses of the relation in the current transaction, such
269  * as open scans.
270  *
271  * NB: We count on this to protect us against problems with refreshing the
272  * data using HEAP_INSERT_FROZEN.
273  */
274  CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
275 
276  /*
277  * Tentatively mark the matview as populated or not (this will roll back
278  * if we fail later).
279  */
280  SetMatViewPopulatedState(matviewRel, !stmt->skipData);
281 
282  relowner = matviewRel->rd_rel->relowner;
283 
284  /*
285  * Switch to the owner's userid, so that any functions are run as that
286  * user. Also arrange to make GUC variable changes local to this command.
287  * Don't lock it down too tight to create a temporary table just yet. We
288  * will switch modes when we are about to execute user code.
289  */
290  GetUserIdAndSecContext(&save_userid, &save_sec_context);
291  SetUserIdAndSecContext(relowner,
292  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
293  save_nestlevel = NewGUCNestLevel();
294 
295  /* Concurrent refresh builds new data in temp tablespace, and does diff. */
296  if (concurrent)
297  {
299  relpersistence = RELPERSISTENCE_TEMP;
300  }
301  else
302  {
303  tableSpace = matviewRel->rd_rel->reltablespace;
304  relpersistence = matviewRel->rd_rel->relpersistence;
305  }
306 
307  /*
308  * Create the transient table that will receive the regenerated data. Lock
309  * it against access by any other process until commit (by which time it
310  * will be gone).
311  */
312  OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
313  ExclusiveLock);
315  dest = CreateTransientRelDestReceiver(OIDNewHeap);
316 
317  /*
318  * Now lock down security-restricted operations.
319  */
320  SetUserIdAndSecContext(relowner,
321  save_sec_context | SECURITY_RESTRICTED_OPERATION);
322 
323  /* Generate the data, if wanted. */
324  if (!stmt->skipData)
325  refresh_matview_datafill(dest, dataQuery, queryString);
326 
327  heap_close(matviewRel, NoLock);
328 
329  /* Make the matview match the newly generated data. */
330  if (concurrent)
331  {
332  int old_depth = matview_maintenance_depth;
333 
334  PG_TRY();
335  {
336  refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
337  save_sec_context);
338  }
339  PG_CATCH();
340  {
341  matview_maintenance_depth = old_depth;
342  PG_RE_THROW();
343  }
344  PG_END_TRY();
345  Assert(matview_maintenance_depth == old_depth);
346  }
347  else
348  refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
349 
350  /* Roll back any GUC changes */
351  AtEOXact_GUC(false, save_nestlevel);
352 
353  /* Restore userid and security context */
354  SetUserIdAndSecContext(save_userid, save_sec_context);
355 
356  ObjectAddressSet(address, RelationRelationId, matviewOid);
357 
358  return address;
359 }
360 
361 /*
362  * refresh_matview_datafill
363  */
364 static void
366  const char *queryString)
367 {
368  List *rewritten;
369  PlannedStmt *plan;
370  QueryDesc *queryDesc;
371  Query *copied_query;
372 
373  /* Lock and rewrite, using a copy to preserve the original query. */
374  copied_query = copyObject(query);
375  AcquireRewriteLocks(copied_query, true, false);
376  rewritten = QueryRewrite(copied_query);
377 
378  /* SELECT should never rewrite to more or less than one SELECT query */
379  if (list_length(rewritten) != 1)
380  elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
381  query = (Query *) linitial(rewritten);
382 
383  /* Check for user-requested abort. */
385 
386  /* Plan the query which will generate data for the refresh. */
387  plan = pg_plan_query(query, 0, NULL);
388 
389  /*
390  * Use a snapshot with an updated command ID to ensure this query sees
391  * results of any previously executed queries. (This could only matter if
392  * the planner executed an allegedly-stable function that changed the
393  * database contents, but let's do it anyway to be safe.)
394  */
397 
398  /* Create a QueryDesc, redirecting output to our tuple receiver */
399  queryDesc = CreateQueryDesc(plan, queryString,
401  dest, NULL, 0);
402 
403  /* call ExecutorStart to prepare the plan for execution */
405 
406  /* run the plan */
407  ExecutorRun(queryDesc, ForwardScanDirection, 0L);
408 
409  /* and clean up */
410  ExecutorFinish(queryDesc);
411  ExecutorEnd(queryDesc);
412 
413  FreeQueryDesc(queryDesc);
414 
416 }
417 
418 DestReceiver *
420 {
422 
423  self->pub.receiveSlot = transientrel_receive;
424  self->pub.rStartup = transientrel_startup;
425  self->pub.rShutdown = transientrel_shutdown;
426  self->pub.rDestroy = transientrel_destroy;
427  self->pub.mydest = DestTransientRel;
428  self->transientoid = transientoid;
429 
430  return (DestReceiver *) self;
431 }
432 
433 /*
434  * transientrel_startup --- executor startup
435  */
436 static void
437 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
438 {
439  DR_transientrel *myState = (DR_transientrel *) self;
440  Relation transientrel;
441 
442  transientrel = heap_open(myState->transientoid, NoLock);
443 
444  /*
445  * Fill private fields of myState for use by later routines
446  */
447  myState->transientrel = transientrel;
448  myState->output_cid = GetCurrentCommandId(true);
449 
450  /*
451  * We can skip WAL-logging the insertions, unless PITR or streaming
452  * replication is in use. We can skip the FSM in any case.
453  */
455  if (!XLogIsNeeded())
456  myState->hi_options |= HEAP_INSERT_SKIP_WAL;
457  myState->bistate = GetBulkInsertState();
458 
459  /* Not using WAL requires smgr_targblock be initially invalid */
461 }
462 
463 /*
464  * transientrel_receive --- receive one tuple
465  */
466 static bool
468 {
469  DR_transientrel *myState = (DR_transientrel *) self;
470  HeapTuple tuple;
471 
472  /*
473  * get the heap tuple out of the tuple table slot, making sure we have a
474  * writable copy
475  */
476  tuple = ExecMaterializeSlot(slot);
477 
478  heap_insert(myState->transientrel,
479  tuple,
480  myState->output_cid,
481  myState->hi_options,
482  myState->bistate);
483 
484  /* We know this is a newly created relation, so there are no indexes */
485 
486  return true;
487 }
488 
489 /*
490  * transientrel_shutdown --- executor end
491  */
492 static void
494 {
495  DR_transientrel *myState = (DR_transientrel *) self;
496 
497  FreeBulkInsertState(myState->bistate);
498 
499  /* If we skipped using WAL, must heap_sync before commit */
500  if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
501  heap_sync(myState->transientrel);
502 
503  /* close transientrel, but keep lock until commit */
504  heap_close(myState->transientrel, NoLock);
505  myState->transientrel = NULL;
506 }
507 
508 /*
509  * transientrel_destroy --- release DestReceiver object
510  */
511 static void
513 {
514  pfree(self);
515 }
516 
517 
518 /*
519  * Given a qualified temporary table name, append an underscore followed by
520  * the given integer, to make a new table name based on the old one.
521  *
522  * This leaks memory through palloc(), which won't be cleaned up until the
523  * current memory context is freed.
524  */
525 static char *
526 make_temptable_name_n(char *tempname, int n)
527 {
528  StringInfoData namebuf;
529 
530  initStringInfo(&namebuf);
531  appendStringInfoString(&namebuf, tempname);
532  appendStringInfo(&namebuf, "_%d", n);
533  return namebuf.data;
534 }
535 
536 static void
538 {
539  HeapTuple opertup;
540  Form_pg_operator operform;
541 
542  opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
543  if (!HeapTupleIsValid(opertup))
544  elog(ERROR, "cache lookup failed for operator %u", opoid);
545  operform = (Form_pg_operator) GETSTRUCT(opertup);
546  Assert(operform->oprkind == 'b');
547 
548  appendStringInfo(buf, "OPERATOR(%s.%s)",
549  quote_identifier(get_namespace_name(operform->oprnamespace)),
550  NameStr(operform->oprname));
551 
552  ReleaseSysCache(opertup);
553 }
554 
555 /*
556  * refresh_by_match_merge
557  *
558  * Refresh a materialized view with transactional semantics, while allowing
559  * concurrent reads.
560  *
561  * This is called after a new version of the data has been created in a
562  * temporary table. It performs a full outer join against the old version of
563  * the data, producing "diff" results. This join cannot work if there are any
564  * duplicated rows in either the old or new versions, in the sense that every
565  * column would compare as equal between the two rows. It does work correctly
566  * in the face of rows which have at least one NULL value, with all non-NULL
567  * columns equal. The behavior of NULLs on equality tests and on UNIQUE
568  * indexes turns out to be quite convenient here; the tests we need to make
569  * are consistent with default behavior. If there is at least one UNIQUE
570  * index on the materialized view, we have exactly the guarantee we need.
571  *
572  * The temporary table used to hold the diff results contains just the TID of
573  * the old record (if matched) and the ROW from the new table as a single
574  * column of complex record type (if matched).
575  *
576  * Once we have the diff table, we perform set-based DELETE and INSERT
577  * operations against the materialized view, and discard both temporary
578  * tables.
579  *
580  * Everything from the generation of the new data to applying the differences
581  * takes place under cover of an ExclusiveLock, since it seems as though we
582  * would want to prohibit not only concurrent REFRESH operations, but also
583  * incremental maintenance. It also doesn't seem reasonable or safe to allow
584  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
585  * this command.
586  */
587 static void
588 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
589  int save_sec_context)
590 {
591  StringInfoData querybuf;
592  Relation matviewRel;
593  Relation tempRel;
594  char *matviewname;
595  char *tempname;
596  char *diffname;
597  TupleDesc tupdesc;
598  bool foundUniqueIndex;
599  List *indexoidlist;
600  ListCell *indexoidscan;
601  int16 relnatts;
602  bool *usedForQual;
603 
604  initStringInfo(&querybuf);
605  matviewRel = heap_open(matviewOid, NoLock);
607  RelationGetRelationName(matviewRel));
608  tempRel = heap_open(tempOid, NoLock);
610  RelationGetRelationName(tempRel));
611  diffname = make_temptable_name_n(tempname, 2);
612 
613  relnatts = matviewRel->rd_rel->relnatts;
614  usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
615 
616  /* Open SPI context. */
617  if (SPI_connect() != SPI_OK_CONNECT)
618  elog(ERROR, "SPI_connect failed");
619 
620  /* Analyze the temp table with the new contents. */
621  appendStringInfo(&querybuf, "ANALYZE %s", tempname);
622  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
623  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
624 
625  /*
626  * We need to ensure that there are not duplicate rows without NULLs in
627  * the new data set before we can count on the "diff" results. Check for
628  * that in a way that allows showing the first duplicated row found. Even
629  * after we pass this test, a unique index on the materialized view may
630  * find a duplicate key problem.
631  */
632  resetStringInfo(&querybuf);
633  appendStringInfo(&querybuf,
634  "SELECT newdata FROM %s newdata "
635  "WHERE newdata IS NOT NULL AND EXISTS "
636  "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
637  "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
638  "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
639  "newdata.ctid) LIMIT 1",
640  tempname, tempname);
641  if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
642  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
643  if (SPI_processed > 0)
644  {
645  /*
646  * Note that this ereport() is returning data to the user. Generally,
647  * we would want to make sure that the user has been granted access to
648  * this data. However, REFRESH MAT VIEW is only able to be run by the
649  * owner of the mat view (or a superuser) and therefore there is no
650  * need to check for access to data in the mat view.
651  */
652  ereport(ERROR,
653  (errcode(ERRCODE_CARDINALITY_VIOLATION),
654  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
655  RelationGetRelationName(matviewRel)),
656  errdetail("Row: %s",
658  }
659 
660  SetUserIdAndSecContext(relowner,
661  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
662 
663  /* Start building the query for creating the diff table. */
664  resetStringInfo(&querybuf);
665  appendStringInfo(&querybuf,
666  "CREATE TEMP TABLE %s AS "
667  "SELECT mv.ctid AS tid, newdata "
668  "FROM %s mv FULL JOIN %s newdata ON (",
669  diffname, matviewname, tempname);
670 
671  /*
672  * Get the list of index OIDs for the table from the relcache, and look up
673  * each one in the pg_index syscache. We will test for equality on all
674  * columns present in all unique indexes which only reference columns and
675  * include all rows.
676  */
677  tupdesc = matviewRel->rd_att;
678  foundUniqueIndex = false;
679  indexoidlist = RelationGetIndexList(matviewRel);
680 
681  foreach(indexoidscan, indexoidlist)
682  {
683  Oid indexoid = lfirst_oid(indexoidscan);
684  Relation indexRel;
685  Form_pg_index indexStruct;
686 
687  indexRel = index_open(indexoid, RowExclusiveLock);
688  indexStruct = indexRel->rd_index;
689 
690  /*
691  * We're only interested if it is unique, valid, contains no
692  * expressions, and is not partial.
693  */
694  if (indexStruct->indisunique &&
695  IndexIsValid(indexStruct) &&
696  RelationGetIndexExpressions(indexRel) == NIL &&
697  RelationGetIndexPredicate(indexRel) == NIL)
698  {
699  int numatts = indexStruct->indnatts;
700  int i;
701 
702  /* Add quals for all columns from this index. */
703  for (i = 0; i < numatts; i++)
704  {
705  int attnum = indexStruct->indkey.values[i];
706  Oid type;
707  Oid op;
708  const char *colname;
709 
710  /*
711  * Only include the column once regardless of how many times
712  * it shows up in how many indexes.
713  */
714  if (usedForQual[attnum - 1])
715  continue;
716  usedForQual[attnum - 1] = true;
717 
718  /*
719  * Actually add the qual, ANDed with any others.
720  */
721  if (foundUniqueIndex)
722  appendStringInfoString(&querybuf, " AND ");
723 
724  colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
725  appendStringInfo(&querybuf, "newdata.%s ", colname);
726  type = attnumTypeId(matviewRel, attnum);
728  mv_GenerateOper(&querybuf, op);
729  appendStringInfo(&querybuf, " mv.%s", colname);
730 
731  foundUniqueIndex = true;
732  }
733  }
734 
735  /* Keep the locks, since we're about to run DML which needs them. */
736  index_close(indexRel, NoLock);
737  }
738 
739  list_free(indexoidlist);
740 
741  /*
742  * There must be at least one unique index on the matview.
743  *
744  * ExecRefreshMatView() checks that after taking the exclusive lock on the
745  * matview. So at least one unique index is guaranteed to exist here
746  * because the lock is still being held.
747  */
748  Assert(foundUniqueIndex);
749 
750  appendStringInfoString(&querybuf,
751  " AND newdata OPERATOR(pg_catalog.*=) mv) "
752  "WHERE newdata IS NULL OR mv IS NULL "
753  "ORDER BY tid");
754 
755  /* Create the temporary "diff" table. */
756  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
757  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
758 
759  SetUserIdAndSecContext(relowner,
760  save_sec_context | SECURITY_RESTRICTED_OPERATION);
761 
762  /*
763  * We have no further use for data from the "full-data" temp table, but we
764  * must keep it around because its type is referenced from the diff table.
765  */
766 
767  /* Analyze the diff table. */
768  resetStringInfo(&querybuf);
769  appendStringInfo(&querybuf, "ANALYZE %s", diffname);
770  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
771  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
772 
774 
775  /* Deletes must come before inserts; do them first. */
776  resetStringInfo(&querybuf);
777  appendStringInfo(&querybuf,
778  "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
779  "(SELECT diff.tid FROM %s diff "
780  "WHERE diff.tid IS NOT NULL "
781  "AND diff.newdata IS NULL)",
782  matviewname, diffname);
783  if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
784  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
785 
786  /* Inserts go last. */
787  resetStringInfo(&querybuf);
788  appendStringInfo(&querybuf,
789  "INSERT INTO %s SELECT (diff.newdata).* "
790  "FROM %s diff WHERE tid IS NULL",
791  matviewname, diffname);
792  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
793  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
794 
795  /* We're done maintaining the materialized view. */
797  heap_close(tempRel, NoLock);
798  heap_close(matviewRel, NoLock);
799 
800  /* Clean up temp tables. */
801  resetStringInfo(&querybuf);
802  appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
803  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
804  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
805 
806  /* Close SPI context. */
807  if (SPI_finish() != SPI_OK_FINISH)
808  elog(ERROR, "SPI_finish failed");
809 }
810 
811 /*
812  * Swap the physical files of the target and transient tables, then rebuild
813  * the target's indexes and throw away the transient table. Security context
814  * swapping is handled by the called function, so it is not needed here.
815  */
816 static void
817 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
818 {
819  finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
820  RecentXmin, ReadNextMultiXactId(), relpersistence);
821 }
822 
823 
824 /*
825  * This should be used to test whether the backend is in a context where it is
826  * OK to allow DML statements to modify materialized views. We only want to
827  * allow that for internal code driven by the materialized view definition,
828  * not for arbitrary user-supplied code.
829  *
830  * While the function names reflect the fact that their main intended use is
831  * incremental maintenance of materialized views (in response to changes to
832  * the data in referenced relations), they are initially used to allow REFRESH
833  * without blocking concurrent reads.
834  */
835 bool
837 {
838  return matview_maintenance_depth > 0;
839 }
840 
841 static void
843 {
845 }
846 
847 static void
849 {
852 }
signed short int16
Definition: c.h:252
#define RelationIsPopulated(relation)
Definition: rel.h:549
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:408
#define SPI_OK_CONNECT
Definition: spi.h:47
void RangeVarCallbackOwnsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg)
Definition: tablecmds.c:12585
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:776
List * QueryRewrite(Query *parsetree)
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
int numLocks
Definition: prs2lock.h:42
#define IndexIsValid(indexForm)
Definition: pg_index.h:107
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition: cluster.c:1467
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:9968
#define SECURITY_RESTRICTED_OPERATION
Definition: miscadmin.h:292
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition: miscinit.c:394
DestReceiver pub
Definition: matview.c:47
int LOCKMODE
Definition: lockdefs.h:26
#define HEAP_INSERT_FROZEN
Definition: heapam.h:30
int SPI_connect(void)
Definition: spi.c:84
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:100
Oid RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, bool nowait, RangeVarGetRelidCallback callback, void *callback_arg)
Definition: namespace.c:217
#define ExclusiveLock
Definition: lockdefs.h:44
Oid transientoid
Definition: matview.c:48
bool MatViewIncrementalMaintenanceIsEnabled(void)
Definition: matview.c:836
#define SPI_OK_DELETE
Definition: spi.h:54
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: matview.c:437
#define RelationRelationId
Definition: pg_class.h:29
#define XLogIsNeeded()
Definition: xlog.h:145
int SPI_finish(void)
Definition: spi.c:147
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:138
Form_pg_attribute * attrs
Definition: tupdesc.h:74
#define RELKIND_MATVIEW
Definition: pg_class.h:167
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:834
#define AccessShareLock
Definition: lockdefs.h:36
SPITupleTable * SPI_tuptable
Definition: spi.c:41
#define TYPECACHE_EQ_OPR
Definition: typcache.h:110
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
Definition: execMain.c:285
static void CloseMatViewIncrementalMaintenance(void)
Definition: matview.c:848
int errcode(int sqlerrcode)
Definition: elog.c:575
TransactionId RecentXmin
Definition: snapmgr.c:165
void heap_sync(Relation rel)
Definition: heapam.c:9122
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
#define heap_close(r, l)
Definition: heapam.h:97
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, int instrument_options)
Definition: pquery.c:66
Form_pg_class rd_rel
Definition: rel.h:113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
HeapTuple * vals
Definition: spi.h:27
#define RelationGetTargetBlock(relation)
Definition: rel.h:485
bool isInstead
Definition: prs2lock.h:31
RangeVar * relation
Definition: parsenodes.h:3036
uint64 SPI_processed
Definition: spi.c:39
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:149
Oid GetDefaultTablespace(char relpersistence)
Definition: tablespace.c:1111
Definition: localtime.c:74
List * RelationGetIndexPredicate(Relation relation)
Definition: relcache.c:4676
void * copyObject(const void *from)
Definition: copyfuncs.c:4475
static char * make_temptable_name_n(char *tempname, int n)
Definition: matview.c:526
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:803
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:439
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2306
Form_pg_index rd_index
Definition: rel.h:155
void pfree(void *pointer)
Definition: mcxt.c:992
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define linitial(l)
Definition: pg_list.h:110
#define EXEC_FLAG_WITHOUT_OIDS
Definition: executor.h:64
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
int SPI_exec(const char *src, long tcount)
Definition: spi.c:331
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:764
ItemPointerData t_self
Definition: htup.h:65
static void transientrel_destroy(DestReceiver *self)
Definition: matview.c:512
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:189
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
#define NoLock
Definition: lockdefs.h:34
static char * buf
Definition: pg_test_fsync.c:65
List * RelationGetIndexExpressions(Relation relation)
Definition: relcache.c:4613
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition: miscinit.c:387
#define RowExclusiveLock
Definition: lockdefs.h:38
CmdType event
Definition: prs2lock.h:27
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:5036
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define SPI_OK_UTILITY
Definition: spi.h:50
static void transientrel_shutdown(DestReceiver *self)
Definition: matview.c:493
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: matview.c:467
Oid attnumTypeId(Relation rd, int attid)
static void refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString)
Definition: matview.c:365
#define RelationGetRelationName(relation)
Definition: rel.h:433
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
RewriteRule ** rules
Definition: prs2lock.h:43
void AcquireRewriteLocks(Query *parsetree, bool forExecute, bool forUpdatePushedDown)
List * actions
Definition: prs2lock.h:29
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition: tablecmds.c:2950
#define ereport(elevel, rest)
Definition: elog.h:122
Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2383
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:379
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
Definition: matview.c:817
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10054
FormData_pg_index * Form_pg_index
Definition: pg_index.h:67
#define InvalidSnapshot
Definition: snapshot.h:25
void * palloc0(Size size)
Definition: mcxt.c:920
void CommandCounterIncrement(void)
Definition: xact.c:921
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1083
static struct state * newstate(struct nfa *nfa)
Definition: regc_nfa.c:124
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
TupleDesc tupdesc
Definition: spi.h:26
#define SECURITY_LOCAL_USERID_CHANGE
Definition: miscadmin.h:291
TupleDesc rd_att
Definition: rel.h:114
Relation transientrel
Definition: matview.c:50
#define SPI_OK_SELECT
Definition: spi.h:51
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:191
int hi_options
Definition: matview.c:52
#define PG_CATCH()
Definition: elog.h:293
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2320
RuleLock * rd_rules
Definition: rel.h:117
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
#define InvalidBlockNumber
Definition: block.h:33
static int list_length(const List *l)
Definition: pg_list.h:89
#define SPI_OK_FINISH
Definition: spi.h:48
static void OpenMatViewIncrementalMaintenance(void)
Definition: matview.c:842
#define PG_RE_THROW()
Definition: elog.h:314
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:725
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4336
FormData_pg_operator* Form_pg_operator
Definition: pg_operator.h:57
static void mv_GenerateOper(StringInfo buf, Oid opoid)
Definition: matview.c:537
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:176
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
FormData_pg_class * Form_pg_class
Definition: pg_class.h:95
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:158
#define AccessExclusiveLock
Definition: lockdefs.h:46
DestReceiver * CreateTransientRelDestReceiver(Oid transientoid)
Definition: matview.c:419
int NewGUCNestLevel(void)
Definition: guc.c:5022
int errmsg(const char *fmt,...)
Definition: elog.c:797
static int matview_maintenance_depth
Definition: matview.c:56
void list_free(List *list)
Definition: list.c:1133
int i
#define NameStr(name)
Definition: c.h:495
ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, char *completionTag)
Definition: matview.c:136
#define SPI_OK_INSERT
Definition: spi.h:53
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
CommandId output_cid
Definition: matview.c:51
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:686
#define elog
Definition: elog.h:219
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
#define RELPERSISTENCE_TEMP
Definition: pg_class.h:172
#define PG_TRY()
Definition: elog.h:284
Definition: pg_list.h:45
#define RelationGetRelid(relation)
Definition: rel.h:413
void SetMatViewPopulatedState(Relation relation, bool newstate)
Definition: matview.c:82
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:151
#define PG_END_TRY()
Definition: elog.h:300
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence, LOCKMODE lockmode)
Definition: cluster.c:606
#define lfirst_oid(lc)
Definition: pg_list.h:108
BulkInsertState bistate
Definition: matview.c:53
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context)
Definition: matview.c:588
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:303
MultiXactId ReadNextMultiXactId(void)
Definition: multixact.c:721
#define RelationGetNamespace(relation)
Definition: rel.h:440
PlannedStmt * pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams)
Definition: postgres.c:781