PostgreSQL Source Code  git master
matview.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  * materialized view support
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/multixact.h"
21 #include "access/tableam.h"
22 #include "access/xact.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_am.h"
26 #include "catalog/pg_opclass.h"
27 #include "commands/cluster.h"
28 #include "commands/matview.h"
29 #include "commands/tablecmds.h"
30 #include "commands/tablespace.h"
31 #include "executor/executor.h"
32 #include "executor/spi.h"
33 #include "miscadmin.h"
34 #include "pgstat.h"
35 #include "rewrite/rewriteHandler.h"
36 #include "storage/lmgr.h"
37 #include "tcop/tcopprot.h"
38 #include "utils/builtins.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 #include "utils/snapmgr.h"
42 #include "utils/syscache.h"
43 
44 
45 typedef struct
46 {
47  DestReceiver pub; /* publicly-known function pointers */
48  Oid transientoid; /* OID of new heap into which to store */
49  /* These fields are filled by transientrel_startup: */
50  Relation transientrel; /* relation to write to */
51  CommandId output_cid; /* cmin to insert in output tuples */
52  int ti_options; /* table_tuple_insert performance options */
53  BulkInsertState bistate; /* bulk insert state */
55 
57 
58 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60 static void transientrel_shutdown(DestReceiver *self);
61 static void transientrel_destroy(DestReceiver *self);
62 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
63  const char *queryString);
64 static char *make_temptable_name_n(char *tempname, int n);
65 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
66  int save_sec_context);
67 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
68 static bool is_usable_unique_index(Relation indexRel);
69 static void OpenMatViewIncrementalMaintenance(void);
70 static void CloseMatViewIncrementalMaintenance(void);
71 
72 /*
73  * SetMatViewPopulatedState
74  * Mark a materialized view as populated, or not.
75  *
76  * NOTE: caller must be holding an appropriate lock on the relation.
77  */
78 void
80 {
81  Relation pgrel;
82  HeapTuple tuple;
83 
84  Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
85 
86  /*
87  * Update relation's pg_class entry. Crucial side-effect: other backends
88  * (and this one too!) are sent SI message to make them rebuild relcache
89  * entries.
90  */
91  pgrel = table_open(RelationRelationId, RowExclusiveLock);
92  tuple = SearchSysCacheCopy1(RELOID,
94  if (!HeapTupleIsValid(tuple))
95  elog(ERROR, "cache lookup failed for relation %u",
96  RelationGetRelid(relation));
97 
98  ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
99 
100  CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
101 
102  heap_freetuple(tuple);
104 
105  /*
106  * Advance command counter to make the updated pg_class row locally
107  * visible.
108  */
110 }
111 
112 /*
113  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
114  *
115  * This refreshes the materialized view by creating a new table and swapping
116  * the relfilenumbers of the new table and the old materialized view, so the OID
117  * of the original materialized view is preserved. Thus we do not lose GRANT
118  * nor references to this materialized view.
119  *
120  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
121  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
122  * statement associated with the materialized view. The statement node's
123  * skipData field shows whether the clause was used.
124  *
125  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
126  * the new heap, it's better to create the indexes afterwards than to fill them
127  * incrementally while we load.
128  *
129  * The matview's "populated" state is changed based on whether the contents
130  * reflect the result set of the materialized view's query.
131  */
133 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
134  ParamListInfo params, QueryCompletion *qc)
135 {
136  Oid matviewOid;
137  Relation matviewRel;
138  RewriteRule *rule;
139  List *actions;
140  Query *dataQuery;
141  Oid tableSpace;
142  Oid relowner;
143  Oid OIDNewHeap;
145  uint64 processed = 0;
146  bool concurrent;
147  LOCKMODE lockmode;
148  char relpersistence;
149  Oid save_userid;
150  int save_sec_context;
151  int save_nestlevel;
152  ObjectAddress address;
153 
154  /* Determine strength of lock needed. */
155  concurrent = stmt->concurrent;
156  lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
157 
158  /*
159  * Get a lock until end of transaction.
160  */
161  matviewOid = RangeVarGetRelidExtended(stmt->relation,
162  lockmode, 0,
164  NULL);
165  matviewRel = table_open(matviewOid, NoLock);
166  relowner = matviewRel->rd_rel->relowner;
167 
168  /*
169  * Switch to the owner's userid, so that any functions are run as that
170  * user. Also lock down security-restricted operations and arrange to
171  * make GUC variable changes local to this command.
172  */
173  GetUserIdAndSecContext(&save_userid, &save_sec_context);
174  SetUserIdAndSecContext(relowner,
175  save_sec_context | SECURITY_RESTRICTED_OPERATION);
176  save_nestlevel = NewGUCNestLevel();
178 
179  /* Make sure it is a materialized view. */
180  if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
181  ereport(ERROR,
182  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
183  errmsg("\"%s\" is not a materialized view",
184  RelationGetRelationName(matviewRel))));
185 
186  /* Check that CONCURRENTLY is not specified if not populated. */
187  if (concurrent && !RelationIsPopulated(matviewRel))
188  ereport(ERROR,
189  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
190  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
191 
192  /* Check that conflicting options have not been specified. */
193  if (concurrent && stmt->skipData)
194  ereport(ERROR,
195  (errcode(ERRCODE_SYNTAX_ERROR),
196  errmsg("%s and %s options cannot be used together",
197  "CONCURRENTLY", "WITH NO DATA")));
198 
199  /*
200  * Check that everything is correct for a refresh. Problems at this point
201  * are internal errors, so elog is sufficient.
202  */
203  if (matviewRel->rd_rel->relhasrules == false ||
204  matviewRel->rd_rules->numLocks < 1)
205  elog(ERROR,
206  "materialized view \"%s\" is missing rewrite information",
207  RelationGetRelationName(matviewRel));
208 
209  if (matviewRel->rd_rules->numLocks > 1)
210  elog(ERROR,
211  "materialized view \"%s\" has too many rules",
212  RelationGetRelationName(matviewRel));
213 
214  rule = matviewRel->rd_rules->rules[0];
215  if (rule->event != CMD_SELECT || !(rule->isInstead))
216  elog(ERROR,
217  "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
218  RelationGetRelationName(matviewRel));
219 
220  actions = rule->actions;
221  if (list_length(actions) != 1)
222  elog(ERROR,
223  "the rule for materialized view \"%s\" is not a single action",
224  RelationGetRelationName(matviewRel));
225 
226  /*
227  * Check that there is a unique index with no WHERE clause on one or more
228  * columns of the materialized view if CONCURRENTLY is specified.
229  */
230  if (concurrent)
231  {
232  List *indexoidlist = RelationGetIndexList(matviewRel);
233  ListCell *indexoidscan;
234  bool hasUniqueIndex = false;
235 
236  foreach(indexoidscan, indexoidlist)
237  {
238  Oid indexoid = lfirst_oid(indexoidscan);
239  Relation indexRel;
240 
241  indexRel = index_open(indexoid, AccessShareLock);
242  hasUniqueIndex = is_usable_unique_index(indexRel);
243  index_close(indexRel, AccessShareLock);
244  if (hasUniqueIndex)
245  break;
246  }
247 
248  list_free(indexoidlist);
249 
250  if (!hasUniqueIndex)
251  ereport(ERROR,
252  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
253  errmsg("cannot refresh materialized view \"%s\" concurrently",
255  RelationGetRelationName(matviewRel))),
256  errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
257  }
258 
259  /*
260  * The stored query was rewritten at the time of the MV definition, but
261  * has not been scribbled on by the planner.
262  */
263  dataQuery = linitial_node(Query, actions);
264 
265  /*
266  * Check for active uses of the relation in the current transaction, such
267  * as open scans.
268  *
269  * NB: We count on this to protect us against problems with refreshing the
270  * data using TABLE_INSERT_FROZEN.
271  */
272  CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
273 
274  /*
275  * Tentatively mark the matview as populated or not (this will roll back
276  * if we fail later).
277  */
278  SetMatViewPopulatedState(matviewRel, !stmt->skipData);
279 
280  /* Concurrent refresh builds new data in temp tablespace, and does diff. */
281  if (concurrent)
282  {
283  tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
284  relpersistence = RELPERSISTENCE_TEMP;
285  }
286  else
287  {
288  tableSpace = matviewRel->rd_rel->reltablespace;
289  relpersistence = matviewRel->rd_rel->relpersistence;
290  }
291 
292  /*
293  * Create the transient table that will receive the regenerated data. Lock
294  * it against access by any other process until commit (by which time it
295  * will be gone).
296  */
297  OIDNewHeap = make_new_heap(matviewOid, tableSpace,
298  matviewRel->rd_rel->relam,
299  relpersistence, ExclusiveLock);
301  dest = CreateTransientRelDestReceiver(OIDNewHeap);
302 
303  /* Generate the data, if wanted. */
304  if (!stmt->skipData)
305  processed = refresh_matview_datafill(dest, dataQuery, queryString);
306 
307  /* Make the matview match the newly generated data. */
308  if (concurrent)
309  {
310  int old_depth = matview_maintenance_depth;
311 
312  PG_TRY();
313  {
314  refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
315  save_sec_context);
316  }
317  PG_CATCH();
318  {
319  matview_maintenance_depth = old_depth;
320  PG_RE_THROW();
321  }
322  PG_END_TRY();
323  Assert(matview_maintenance_depth == old_depth);
324  }
325  else
326  {
327  refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
328 
329  /*
330  * Inform cumulative stats system about our activity: basically, we
331  * truncated the matview and inserted some new data. (The concurrent
332  * code path above doesn't need to worry about this because the
333  * inserts and deletes it issues get counted by lower-level code.)
334  */
335  pgstat_count_truncate(matviewRel);
336  if (!stmt->skipData)
337  pgstat_count_heap_insert(matviewRel, processed);
338  }
339 
340  table_close(matviewRel, NoLock);
341 
342  /* Roll back any GUC changes */
343  AtEOXact_GUC(false, save_nestlevel);
344 
345  /* Restore userid and security context */
346  SetUserIdAndSecContext(save_userid, save_sec_context);
347 
348  ObjectAddressSet(address, RelationRelationId, matviewOid);
349 
350  /*
351  * Save the rowcount so that pg_stat_statements can track the total number
352  * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
353  * still don't display the rowcount in the command completion tag output,
354  * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
355  * command tag is left false in cmdtaglist.h. Otherwise, the change of
356  * completion tag output might break applications using it.
357  */
358  if (qc)
359  SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
360 
361  return address;
362 }
363 
364 /*
365  * refresh_matview_datafill
366  *
367  * Execute the given query, sending result rows to "dest" (which will
368  * insert them into the target matview).
369  *
370  * Returns number of rows inserted.
371  */
372 static uint64
374  const char *queryString)
375 {
376  List *rewritten;
377  PlannedStmt *plan;
378  QueryDesc *queryDesc;
379  Query *copied_query;
380  uint64 processed;
381 
382  /* Lock and rewrite, using a copy to preserve the original query. */
383  copied_query = copyObject(query);
384  AcquireRewriteLocks(copied_query, true, false);
385  rewritten = QueryRewrite(copied_query);
386 
387  /* SELECT should never rewrite to more or less than one SELECT query */
388  if (list_length(rewritten) != 1)
389  elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
390  query = (Query *) linitial(rewritten);
391 
392  /* Check for user-requested abort. */
394 
395  /* Plan the query which will generate data for the refresh. */
396  plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
397 
398  /*
399  * Use a snapshot with an updated command ID to ensure this query sees
400  * results of any previously executed queries. (This could only matter if
401  * the planner executed an allegedly-stable function that changed the
402  * database contents, but let's do it anyway to be safe.)
403  */
406 
407  /* Create a QueryDesc, redirecting output to our tuple receiver */
408  queryDesc = CreateQueryDesc(plan, queryString,
410  dest, NULL, NULL, 0);
411 
412  /* call ExecutorStart to prepare the plan for execution */
413  ExecutorStart(queryDesc, 0);
414 
415  /* run the plan */
416  ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
417 
418  processed = queryDesc->estate->es_processed;
419 
420  /* and clean up */
421  ExecutorFinish(queryDesc);
422  ExecutorEnd(queryDesc);
423 
424  FreeQueryDesc(queryDesc);
425 
427 
428  return processed;
429 }
430 
431 DestReceiver *
433 {
435 
436  self->pub.receiveSlot = transientrel_receive;
437  self->pub.rStartup = transientrel_startup;
438  self->pub.rShutdown = transientrel_shutdown;
439  self->pub.rDestroy = transientrel_destroy;
440  self->pub.mydest = DestTransientRel;
441  self->transientoid = transientoid;
442 
443  return (DestReceiver *) self;
444 }
445 
446 /*
447  * transientrel_startup --- executor startup
448  */
449 static void
450 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
451 {
452  DR_transientrel *myState = (DR_transientrel *) self;
453  Relation transientrel;
454 
455  transientrel = table_open(myState->transientoid, NoLock);
456 
457  /*
458  * Fill private fields of myState for use by later routines
459  */
460  myState->transientrel = transientrel;
461  myState->output_cid = GetCurrentCommandId(true);
463  myState->bistate = GetBulkInsertState();
464 
465  /*
466  * Valid smgr_targblock implies something already wrote to the relation.
467  * This may be harmless, but this function hasn't planned for it.
468  */
470 }
471 
472 /*
473  * transientrel_receive --- receive one tuple
474  */
475 static bool
477 {
478  DR_transientrel *myState = (DR_transientrel *) self;
479 
480  /*
481  * Note that the input slot might not be of the type of the target
482  * relation. That's supported by table_tuple_insert(), but slightly less
483  * efficient than inserting with the right slot - but the alternative
484  * would be to copy into a slot of the right type, which would not be
485  * cheap either. This also doesn't allow accessing per-AM data (say a
486  * tuple's xmin), but since we don't do that here...
487  */
488 
490  slot,
491  myState->output_cid,
492  myState->ti_options,
493  myState->bistate);
494 
495  /* We know this is a newly created relation, so there are no indexes */
496 
497  return true;
498 }
499 
500 /*
501  * transientrel_shutdown --- executor end
502  */
503 static void
505 {
506  DR_transientrel *myState = (DR_transientrel *) self;
507 
508  FreeBulkInsertState(myState->bistate);
509 
511 
512  /* close transientrel, but keep lock until commit */
513  table_close(myState->transientrel, NoLock);
514  myState->transientrel = NULL;
515 }
516 
517 /*
518  * transientrel_destroy --- release DestReceiver object
519  */
520 static void
522 {
523  pfree(self);
524 }
525 
526 
527 /*
528  * Given a qualified temporary table name, append an underscore followed by
529  * the given integer, to make a new table name based on the old one.
530  * The result is a palloc'd string.
531  *
532  * As coded, this would fail to make a valid SQL name if the given name were,
533  * say, "FOO"."BAR". Currently, the table name portion of the input will
534  * never be double-quoted because it's of the form "pg_temp_NNN", cf
535  * make_new_heap(). But we might have to work harder someday.
536  */
537 static char *
538 make_temptable_name_n(char *tempname, int n)
539 {
540  StringInfoData namebuf;
541 
542  initStringInfo(&namebuf);
543  appendStringInfoString(&namebuf, tempname);
544  appendStringInfo(&namebuf, "_%d", n);
545  return namebuf.data;
546 }
547 
548 /*
549  * refresh_by_match_merge
550  *
551  * Refresh a materialized view with transactional semantics, while allowing
552  * concurrent reads.
553  *
554  * This is called after a new version of the data has been created in a
555  * temporary table. It performs a full outer join against the old version of
556  * the data, producing "diff" results. This join cannot work if there are any
557  * duplicated rows in either the old or new versions, in the sense that every
558  * column would compare as equal between the two rows. It does work correctly
559  * in the face of rows which have at least one NULL value, with all non-NULL
560  * columns equal. The behavior of NULLs on equality tests and on UNIQUE
561  * indexes turns out to be quite convenient here; the tests we need to make
562  * are consistent with default behavior. If there is at least one UNIQUE
563  * index on the materialized view, we have exactly the guarantee we need.
564  *
565  * The temporary table used to hold the diff results contains just the TID of
566  * the old record (if matched) and the ROW from the new table as a single
567  * column of complex record type (if matched).
568  *
569  * Once we have the diff table, we perform set-based DELETE and INSERT
570  * operations against the materialized view, and discard both temporary
571  * tables.
572  *
573  * Everything from the generation of the new data to applying the differences
574  * takes place under cover of an ExclusiveLock, since it seems as though we
575  * would want to prohibit not only concurrent REFRESH operations, but also
576  * incremental maintenance. It also doesn't seem reasonable or safe to allow
577  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
578  * this command.
579  */
580 static void
581 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
582  int save_sec_context)
583 {
584  StringInfoData querybuf;
585  Relation matviewRel;
586  Relation tempRel;
587  char *matviewname;
588  char *tempname;
589  char *diffname;
590  TupleDesc tupdesc;
591  bool foundUniqueIndex;
592  List *indexoidlist;
593  ListCell *indexoidscan;
594  int16 relnatts;
595  Oid *opUsedForQual;
596 
597  initStringInfo(&querybuf);
598  matviewRel = table_open(matviewOid, NoLock);
600  RelationGetRelationName(matviewRel));
601  tempRel = table_open(tempOid, NoLock);
603  RelationGetRelationName(tempRel));
604  diffname = make_temptable_name_n(tempname, 2);
605 
606  relnatts = RelationGetNumberOfAttributes(matviewRel);
607 
608  /* Open SPI context. */
609  if (SPI_connect() != SPI_OK_CONNECT)
610  elog(ERROR, "SPI_connect failed");
611 
612  /* Analyze the temp table with the new contents. */
613  appendStringInfo(&querybuf, "ANALYZE %s", tempname);
614  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
615  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
616 
617  /*
618  * We need to ensure that there are not duplicate rows without NULLs in
619  * the new data set before we can count on the "diff" results. Check for
620  * that in a way that allows showing the first duplicated row found. Even
621  * after we pass this test, a unique index on the materialized view may
622  * find a duplicate key problem.
623  *
624  * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
625  * keep ".*" from being expanded into multiple columns in a SELECT list.
626  * Compare ruleutils.c's get_variable().
627  */
628  resetStringInfo(&querybuf);
629  appendStringInfo(&querybuf,
630  "SELECT newdata.*::%s FROM %s newdata "
631  "WHERE newdata.* IS NOT NULL AND EXISTS "
632  "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
633  "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
634  "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
635  "newdata.ctid)",
636  tempname, tempname, tempname);
637  if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
638  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
639  if (SPI_processed > 0)
640  {
641  /*
642  * Note that this ereport() is returning data to the user. Generally,
643  * we would want to make sure that the user has been granted access to
644  * this data. However, REFRESH MAT VIEW is only able to be run by the
645  * owner of the mat view (or a superuser) and therefore there is no
646  * need to check for access to data in the mat view.
647  */
648  ereport(ERROR,
649  (errcode(ERRCODE_CARDINALITY_VIOLATION),
650  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
651  RelationGetRelationName(matviewRel)),
652  errdetail("Row: %s",
654  }
655 
656  /*
657  * Create the temporary "diff" table.
658  *
659  * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
660  * because you cannot create temp tables in SRO context. For extra
661  * paranoia, add the composite type column only after switching back to
662  * SRO context.
663  */
664  SetUserIdAndSecContext(relowner,
665  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
666  resetStringInfo(&querybuf);
667  appendStringInfo(&querybuf,
668  "CREATE TEMP TABLE %s (tid pg_catalog.tid)",
669  diffname);
670  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
671  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
672  SetUserIdAndSecContext(relowner,
673  save_sec_context | SECURITY_RESTRICTED_OPERATION);
674  resetStringInfo(&querybuf);
675  appendStringInfo(&querybuf,
676  "ALTER TABLE %s ADD COLUMN newdata %s",
677  diffname, tempname);
678  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
679  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
680 
681  /* Start building the query for populating the diff table. */
682  resetStringInfo(&querybuf);
683  appendStringInfo(&querybuf,
684  "INSERT INTO %s "
685  "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
686  "FROM %s mv FULL JOIN %s newdata ON (",
687  diffname, tempname, matviewname, tempname);
688 
689  /*
690  * Get the list of index OIDs for the table from the relcache, and look up
691  * each one in the pg_index syscache. We will test for equality on all
692  * columns present in all unique indexes which only reference columns and
693  * include all rows.
694  */
695  tupdesc = matviewRel->rd_att;
696  opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
697  foundUniqueIndex = false;
698 
699  indexoidlist = RelationGetIndexList(matviewRel);
700 
701  foreach(indexoidscan, indexoidlist)
702  {
703  Oid indexoid = lfirst_oid(indexoidscan);
704  Relation indexRel;
705 
706  indexRel = index_open(indexoid, RowExclusiveLock);
707  if (is_usable_unique_index(indexRel))
708  {
709  Form_pg_index indexStruct = indexRel->rd_index;
710  int indnkeyatts = indexStruct->indnkeyatts;
711  oidvector *indclass;
712  Datum indclassDatum;
713  int i;
714 
715  /* Must get indclass the hard way. */
716  indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
717  indexRel->rd_indextuple,
718  Anum_pg_index_indclass);
719  indclass = (oidvector *) DatumGetPointer(indclassDatum);
720 
721  /* Add quals for all columns from this index. */
722  for (i = 0; i < indnkeyatts; i++)
723  {
724  int attnum = indexStruct->indkey.values[i];
725  Oid opclass = indclass->values[i];
726  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
727  Oid attrtype = attr->atttypid;
728  HeapTuple cla_ht;
729  Form_pg_opclass cla_tup;
730  Oid opfamily;
731  Oid opcintype;
732  Oid op;
733  const char *leftop;
734  const char *rightop;
735 
736  /*
737  * Identify the equality operator associated with this index
738  * column. First we need to look up the column's opclass.
739  */
740  cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
741  if (!HeapTupleIsValid(cla_ht))
742  elog(ERROR, "cache lookup failed for opclass %u", opclass);
743  cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
744  Assert(cla_tup->opcmethod == BTREE_AM_OID);
745  opfamily = cla_tup->opcfamily;
746  opcintype = cla_tup->opcintype;
747  ReleaseSysCache(cla_ht);
748 
749  op = get_opfamily_member(opfamily, opcintype, opcintype,
751  if (!OidIsValid(op))
752  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
753  BTEqualStrategyNumber, opcintype, opcintype, opfamily);
754 
755  /*
756  * If we find the same column with the same equality semantics
757  * in more than one index, we only need to emit the equality
758  * clause once.
759  *
760  * Since we only remember the last equality operator, this
761  * code could be fooled into emitting duplicate clauses given
762  * multiple indexes with several different opclasses ... but
763  * that's so unlikely it doesn't seem worth spending extra
764  * code to avoid.
765  */
766  if (opUsedForQual[attnum - 1] == op)
767  continue;
768  opUsedForQual[attnum - 1] = op;
769 
770  /*
771  * Actually add the qual, ANDed with any others.
772  */
773  if (foundUniqueIndex)
774  appendStringInfoString(&querybuf, " AND ");
775 
776  leftop = quote_qualified_identifier("newdata",
777  NameStr(attr->attname));
778  rightop = quote_qualified_identifier("mv",
779  NameStr(attr->attname));
780 
781  generate_operator_clause(&querybuf,
782  leftop, attrtype,
783  op,
784  rightop, attrtype);
785 
786  foundUniqueIndex = true;
787  }
788  }
789 
790  /* Keep the locks, since we're about to run DML which needs them. */
791  index_close(indexRel, NoLock);
792  }
793 
794  list_free(indexoidlist);
795 
796  /*
797  * There must be at least one usable unique index on the matview.
798  *
799  * ExecRefreshMatView() checks that after taking the exclusive lock on the
800  * matview. So at least one unique index is guaranteed to exist here
801  * because the lock is still being held. (One known exception is if a
802  * function called as part of refreshing the matview drops the index.
803  * That's a pretty silly thing to do.)
804  */
805  if (!foundUniqueIndex)
806  elog(ERROR, "could not find suitable unique index on materialized view");
807 
808  appendStringInfoString(&querybuf,
809  " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
810  "WHERE newdata.* IS NULL OR mv.* IS NULL "
811  "ORDER BY tid");
812 
813  /* Populate the temporary "diff" table. */
814  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
815  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
816 
817  /*
818  * We have no further use for data from the "full-data" temp table, but we
819  * must keep it around because its type is referenced from the diff table.
820  */
821 
822  /* Analyze the diff table. */
823  resetStringInfo(&querybuf);
824  appendStringInfo(&querybuf, "ANALYZE %s", diffname);
825  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
826  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
827 
829 
830  /* Deletes must come before inserts; do them first. */
831  resetStringInfo(&querybuf);
832  appendStringInfo(&querybuf,
833  "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
834  "(SELECT diff.tid FROM %s diff "
835  "WHERE diff.tid IS NOT NULL "
836  "AND diff.newdata IS NULL)",
837  matviewname, diffname);
838  if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
839  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
840 
841  /* Inserts go last. */
842  resetStringInfo(&querybuf);
843  appendStringInfo(&querybuf,
844  "INSERT INTO %s SELECT (diff.newdata).* "
845  "FROM %s diff WHERE tid IS NULL",
846  matviewname, diffname);
847  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
848  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
849 
850  /* We're done maintaining the materialized view. */
852  table_close(tempRel, NoLock);
853  table_close(matviewRel, NoLock);
854 
855  /* Clean up temp tables. */
856  resetStringInfo(&querybuf);
857  appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
858  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
859  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
860 
861  /* Close SPI context. */
862  if (SPI_finish() != SPI_OK_FINISH)
863  elog(ERROR, "SPI_finish failed");
864 }
865 
866 /*
867  * Swap the physical files of the target and transient tables, then rebuild
868  * the target's indexes and throw away the transient table. Security context
869  * swapping is handled by the called function, so it is not needed here.
870  */
871 static void
872 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
873 {
874  finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
875  RecentXmin, ReadNextMultiXactId(), relpersistence);
876 }
877 
878 /*
879  * Check whether specified index is usable for match merge.
880  */
881 static bool
883 {
884  Form_pg_index indexStruct = indexRel->rd_index;
885 
886  /*
887  * Must be unique, valid, immediate, non-partial, and be defined over
888  * plain user columns (not expressions). We also require it to be a
889  * btree. Even if we had any other unique index kinds, we'd not know how
890  * to identify the corresponding equality operator, nor could we be sure
891  * that the planner could implement the required FULL JOIN with non-btree
892  * operators.
893  */
894  if (indexStruct->indisunique &&
895  indexStruct->indimmediate &&
896  indexRel->rd_rel->relam == BTREE_AM_OID &&
897  indexStruct->indisvalid &&
898  RelationGetIndexPredicate(indexRel) == NIL &&
899  indexStruct->indnatts > 0)
900  {
901  /*
902  * The point of groveling through the index columns individually is to
903  * reject both index expressions and system columns. Currently,
904  * matviews couldn't have OID columns so there's no way to create an
905  * index on a system column; but maybe someday that wouldn't be true,
906  * so let's be safe.
907  */
908  int numatts = indexStruct->indnatts;
909  int i;
910 
911  for (i = 0; i < numatts; i++)
912  {
913  int attnum = indexStruct->indkey.values[i];
914 
915  if (attnum <= 0)
916  return false;
917  }
918  return true;
919  }
920  return false;
921 }
922 
923 
924 /*
925  * This should be used to test whether the backend is in a context where it is
926  * OK to allow DML statements to modify materialized views. We only want to
927  * allow that for internal code driven by the materialized view definition,
928  * not for arbitrary user-supplied code.
929  *
930  * While the function names reflect the fact that their main intended use is
931  * incremental maintenance of materialized views (in response to changes to
932  * the data in referenced relations), they are initially used to allow REFRESH
933  * without blocking concurrent reads.
934  */
935 bool
937 {
938  return matview_maintenance_depth > 0;
939 }
940 
941 static void
943 {
945 }
946 
947 static void
949 {
952 }
Oid GetDefaultTablespace(char relpersistence, bool partitioned)
Definition: tablespace.c:1143
#define InvalidBlockNumber
Definition: block.h:33
#define NameStr(name)
Definition: c.h:746
signed short int16
Definition: c.h:493
#define Assert(condition)
Definition: c.h:858
uint32 CommandId
Definition: c.h:666
#define OidIsValid(objectId)
Definition: c.h:775
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition: cluster.c:1432
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition: cluster.c:688
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
@ DestTransientRel
Definition: dest.h:97
int errdetail(const char *fmt,...)
Definition: elog.c:1205
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:467
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:407
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:124
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:297
int NewGUCNestLevel(void)
Definition: guc.c:2237
void RestrictSearchPath(void)
Definition: guc.c:2248
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:2264
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1923
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:1940
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define stmt
Definition: indent_codes.h:59
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
int i
Definition: isn.c:73
void list_free(List *list)
Definition: list.c:1546
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:166
static void transientrel_destroy(DestReceiver *self)
Definition: matview.c:521
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: matview.c:450
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString)
Definition: matview.c:373
static char * make_temptable_name_n(char *tempname, int n)
Definition: matview.c:538
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context)
Definition: matview.c:581
DestReceiver * CreateTransientRelDestReceiver(Oid transientoid)
Definition: matview.c:432
static bool is_usable_unique_index(Relation indexRel)
Definition: matview.c:882
bool MatViewIncrementalMaintenanceIsEnabled(void)
Definition: matview.c:936
static void CloseMatViewIncrementalMaintenance(void)
Definition: matview.c:948
static void OpenMatViewIncrementalMaintenance(void)
Definition: matview.c:942
void SetMatViewPopulatedState(Relation relation, bool newstate)
Definition: matview.c:79
static int matview_maintenance_depth
Definition: matview.c:56
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
Definition: matview.c:872
ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, QueryCompletion *qc)
Definition: matview.c:133
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: matview.c:476
static void transientrel_shutdown(DestReceiver *self)
Definition: matview.c:504
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc0(Size size)
Definition: mcxt.c:1346
#define SECURITY_RESTRICTED_OPERATION
Definition: miscadmin.h:315
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define SECURITY_LOCAL_USERID_CHANGE
Definition: miscadmin.h:314
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition: miscinit.c:635
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition: miscinit.c:642
MultiXactId ReadNextMultiXactId(void)
Definition: multixact.c:729
Oid RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, uint32 flags, RangeVarGetRelidCallback callback, void *callback_arg)
Definition: namespace.c:426
#define copyObject(obj)
Definition: nodes.h:224
@ CMD_SELECT
Definition: nodes.h:265
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define CURSOR_OPT_PARALLEL_OK
Definition: parsenodes.h:3300
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
FormData_pg_index * Form_pg_index
Definition: pg_index.h:70
static int list_length(const List *l)
Definition: pg_list.h:152
#define linitial_node(type, l)
Definition: pg_list.h:181
#define NIL
Definition: pg_list.h:68
#define linitial(l)
Definition: pg_list.h:178
#define lfirst_oid(lc)
Definition: pg_list.h:174
FormData_pg_opclass * Form_pg_opclass
Definition: pg_opclass.h:83
#define plan(x)
Definition: pg_regress.c:162
void pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
void pgstat_count_truncate(Relation rel)
PlannedStmt * pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, ParamListInfo boundParams)
Definition: postgres.c:886
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
unsigned int Oid
Definition: postgres_ext.h:31
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:67
static struct state * newstate(struct nfa *nfa)
Definition: regc_nfa.c:137
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetNumberOfAttributes(relation)
Definition: rel.h:511
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetTargetBlock(relation)
Definition: rel.h:601
#define RelationIsPopulated(relation)
Definition: rel.h:677
#define RelationGetNamespace(relation)
Definition: rel.h:546
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4760
List * RelationGetIndexPredicate(Relation relation)
Definition: relcache.c:5138
void AcquireRewriteLocks(Query *parsetree, bool forExecute, bool forUpdatePushedDown)
List * QueryRewrite(Query *parsetree)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12707
void generate_operator_clause(StringInfo buf, const char *leftop, Oid leftoptype, Oid opoid, const char *rightop, Oid rightoptype)
Definition: ruleutils.c:13031
@ ForwardScanDirection
Definition: sdir.h:28
TransactionId RecentXmin
Definition: snapmgr.c:99
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:712
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:700
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:770
#define InvalidSnapshot
Definition: snapshot.h:123
uint64 SPI_processed
Definition: spi.c:44
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_connect(void)
Definition: spi.c:94
int SPI_finish(void)
Definition: spi.c:182
int SPI_exec(const char *src, long tcount)
Definition: spi.c:627
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:1217
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:593
#define SPI_OK_UTILITY
Definition: spi.h:85
#define SPI_OK_INSERT
Definition: spi.h:88
#define SPI_OK_DELETE
Definition: spi.h:89
#define SPI_OK_CONNECT
Definition: spi.h:82
#define SPI_OK_FINISH
Definition: spi.h:83
#define SPI_OK_SELECT
Definition: spi.h:86
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Relation transientrel
Definition: matview.c:50
BulkInsertState bistate
Definition: matview.c:53
DestReceiver pub
Definition: matview.c:47
CommandId output_cid
Definition: matview.c:51
Oid transientoid
Definition: matview.c:48
int ti_options
Definition: matview.c:52
uint64 es_processed
Definition: execnodes.h:671
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
EState * estate
Definition: execdesc.h:48
struct HeapTupleData * rd_indextuple
Definition: rel.h:194
TupleDesc rd_att
Definition: rel.h:112
Form_pg_index rd_index
Definition: rel.h:192
RuleLock * rd_rules
Definition: rel.h:115
Form_pg_class rd_rel
Definition: rel.h:111
RewriteRule ** rules
Definition: prs2lock.h:43
int numLocks
Definition: prs2lock.h:42
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: c.h:726
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:733
Definition: localtime.c:73
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:218
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:510
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define TABLE_INSERT_FROZEN
Definition: tableam.h:261
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:260
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1605
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1412
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition: tablecmds.c:4345
void RangeVarCallbackMaintainsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg)
Definition: tablecmds.c:18362
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void CommandCounterIncrement(void)
Definition: xact.c:1097
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:826