PostgreSQL Source Code  git master
matview.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  * materialized view support
5  *
6  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/multixact.h"
21 #include "access/tableam.h"
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "catalog/catalog.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_am.h"
28 #include "catalog/pg_opclass.h"
29 #include "catalog/pg_operator.h"
30 #include "commands/cluster.h"
31 #include "commands/matview.h"
32 #include "commands/tablecmds.h"
33 #include "commands/tablespace.h"
34 #include "executor/executor.h"
35 #include "executor/spi.h"
36 #include "miscadmin.h"
37 #include "parser/parse_relation.h"
38 #include "pgstat.h"
39 #include "rewrite/rewriteHandler.h"
40 #include "storage/lmgr.h"
41 #include "storage/smgr.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/rel.h"
46 #include "utils/snapmgr.h"
47 #include "utils/syscache.h"
48 
49 
50 typedef struct
51 {
52  DestReceiver pub; /* publicly-known function pointers */
53  Oid transientoid; /* OID of new heap into which to store */
54  /* These fields are filled by transientrel_startup: */
55  Relation transientrel; /* relation to write to */
56  CommandId output_cid; /* cmin to insert in output tuples */
57  int ti_options; /* table_tuple_insert performance options */
58  BulkInsertState bistate; /* bulk insert state */
60 
62 
63 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 static void transientrel_shutdown(DestReceiver *self);
66 static void transientrel_destroy(DestReceiver *self);
67 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68  const char *queryString);
69 static char *make_temptable_name_n(char *tempname, int n);
70 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71  int save_sec_context);
72 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 static bool is_usable_unique_index(Relation indexRel);
74 static void OpenMatViewIncrementalMaintenance(void);
75 static void CloseMatViewIncrementalMaintenance(void);
76 
77 /*
78  * SetMatViewPopulatedState
79  * Mark a materialized view as populated, or not.
80  *
81  * NOTE: caller must be holding an appropriate lock on the relation.
82  */
83 void
85 {
86  Relation pgrel;
87  HeapTuple tuple;
88 
89  Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90 
91  /*
92  * Update relation's pg_class entry. Crucial side-effect: other backends
93  * (and this one too!) are sent SI message to make them rebuild relcache
94  * entries.
95  */
96  pgrel = table_open(RelationRelationId, RowExclusiveLock);
99  if (!HeapTupleIsValid(tuple))
100  elog(ERROR, "cache lookup failed for relation %u",
101  RelationGetRelid(relation));
102 
103  ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104 
105  CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106 
107  heap_freetuple(tuple);
109 
110  /*
111  * Advance command counter to make the updated pg_class row locally
112  * visible.
113  */
115 }
116 
117 /*
118  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119  *
120  * This refreshes the materialized view by creating a new table and swapping
121  * the relfilenodes of the new table and the old materialized view, so the OID
122  * of the original materialized view is preserved. Thus we do not lose GRANT
123  * nor references to this materialized view.
124  *
125  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127  * statement associated with the materialized view. The statement node's
128  * skipData field shows whether the clause was used.
129  *
130  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131  * the new heap, it's better to create the indexes afterwards than to fill them
132  * incrementally while we load.
133  *
134  * The matview's "populated" state is changed based on whether the contents
135  * reflect the result set of the materialized view's query.
136  */
138 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139  ParamListInfo params, QueryCompletion *qc)
140 {
141  Oid matviewOid;
142  Relation matviewRel;
143  RewriteRule *rule;
144  List *actions;
145  Query *dataQuery;
146  Oid tableSpace;
147  Oid relowner;
148  Oid OIDNewHeap;
150  uint64 processed = 0;
151  bool concurrent;
152  LOCKMODE lockmode;
153  char relpersistence;
154  Oid save_userid;
155  int save_sec_context;
156  int save_nestlevel;
157  ObjectAddress address;
158 
159  /* Determine strength of lock needed. */
160  concurrent = stmt->concurrent;
161  lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162 
163  /*
164  * Get a lock until end of transaction.
165  */
166  matviewOid = RangeVarGetRelidExtended(stmt->relation,
167  lockmode, 0,
169  matviewRel = table_open(matviewOid, NoLock);
170 
171  /* Make sure it is a materialized view. */
172  if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
173  ereport(ERROR,
174  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
175  errmsg("\"%s\" is not a materialized view",
176  RelationGetRelationName(matviewRel))));
177 
178  /* Check that CONCURRENTLY is not specified if not populated. */
179  if (concurrent && !RelationIsPopulated(matviewRel))
180  ereport(ERROR,
181  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
183 
184  /* Check that conflicting options have not been specified. */
185  if (concurrent && stmt->skipData)
186  ereport(ERROR,
187  (errcode(ERRCODE_SYNTAX_ERROR),
188  errmsg("%s and %s options cannot be used together",
189  "CONCURRENTLY", "WITH NO DATA")));
190 
191  /*
192  * Check that everything is correct for a refresh. Problems at this point
193  * are internal errors, so elog is sufficient.
194  */
195  if (matviewRel->rd_rel->relhasrules == false ||
196  matviewRel->rd_rules->numLocks < 1)
197  elog(ERROR,
198  "materialized view \"%s\" is missing rewrite information",
199  RelationGetRelationName(matviewRel));
200 
201  if (matviewRel->rd_rules->numLocks > 1)
202  elog(ERROR,
203  "materialized view \"%s\" has too many rules",
204  RelationGetRelationName(matviewRel));
205 
206  rule = matviewRel->rd_rules->rules[0];
207  if (rule->event != CMD_SELECT || !(rule->isInstead))
208  elog(ERROR,
209  "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
210  RelationGetRelationName(matviewRel));
211 
212  actions = rule->actions;
213  if (list_length(actions) != 1)
214  elog(ERROR,
215  "the rule for materialized view \"%s\" is not a single action",
216  RelationGetRelationName(matviewRel));
217 
218  /*
219  * Check that there is a unique index with no WHERE clause on one or more
220  * columns of the materialized view if CONCURRENTLY is specified.
221  */
222  if (concurrent)
223  {
224  List *indexoidlist = RelationGetIndexList(matviewRel);
225  ListCell *indexoidscan;
226  bool hasUniqueIndex = false;
227 
228  foreach(indexoidscan, indexoidlist)
229  {
230  Oid indexoid = lfirst_oid(indexoidscan);
231  Relation indexRel;
232 
233  indexRel = index_open(indexoid, AccessShareLock);
234  hasUniqueIndex = is_usable_unique_index(indexRel);
235  index_close(indexRel, AccessShareLock);
236  if (hasUniqueIndex)
237  break;
238  }
239 
240  list_free(indexoidlist);
241 
242  if (!hasUniqueIndex)
243  ereport(ERROR,
244  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
245  errmsg("cannot refresh materialized view \"%s\" concurrently",
247  RelationGetRelationName(matviewRel))),
248  errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
249  }
250 
251  /*
252  * The stored query was rewritten at the time of the MV definition, but
253  * has not been scribbled on by the planner.
254  */
255  dataQuery = linitial_node(Query, actions);
256 
257  /*
258  * Check for active uses of the relation in the current transaction, such
259  * as open scans.
260  *
261  * NB: We count on this to protect us against problems with refreshing the
262  * data using TABLE_INSERT_FROZEN.
263  */
264  CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
265 
266  /*
267  * Tentatively mark the matview as populated or not (this will roll back
268  * if we fail later).
269  */
270  SetMatViewPopulatedState(matviewRel, !stmt->skipData);
271 
272  relowner = matviewRel->rd_rel->relowner;
273 
274  /*
275  * Switch to the owner's userid, so that any functions are run as that
276  * user. Also arrange to make GUC variable changes local to this command.
277  * Don't lock it down too tight to create a temporary table just yet. We
278  * will switch modes when we are about to execute user code.
279  */
280  GetUserIdAndSecContext(&save_userid, &save_sec_context);
281  SetUserIdAndSecContext(relowner,
282  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
283  save_nestlevel = NewGUCNestLevel();
284 
285  /* Concurrent refresh builds new data in temp tablespace, and does diff. */
286  if (concurrent)
287  {
288  tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
289  relpersistence = RELPERSISTENCE_TEMP;
290  }
291  else
292  {
293  tableSpace = matviewRel->rd_rel->reltablespace;
294  relpersistence = matviewRel->rd_rel->relpersistence;
295  }
296 
297  /*
298  * Create the transient table that will receive the regenerated data. Lock
299  * it against access by any other process until commit (by which time it
300  * will be gone).
301  */
302  OIDNewHeap = make_new_heap(matviewOid, tableSpace,
303  matviewRel->rd_rel->relam,
304  relpersistence, ExclusiveLock);
306  dest = CreateTransientRelDestReceiver(OIDNewHeap);
307 
308  /*
309  * Now lock down security-restricted operations.
310  */
311  SetUserIdAndSecContext(relowner,
312  save_sec_context | SECURITY_RESTRICTED_OPERATION);
313 
314  /* Generate the data, if wanted. */
315  if (!stmt->skipData)
316  processed = refresh_matview_datafill(dest, dataQuery, queryString);
317 
318  /* Make the matview match the newly generated data. */
319  if (concurrent)
320  {
321  int old_depth = matview_maintenance_depth;
322 
323  PG_TRY();
324  {
325  refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
326  save_sec_context);
327  }
328  PG_CATCH();
329  {
330  matview_maintenance_depth = old_depth;
331  PG_RE_THROW();
332  }
333  PG_END_TRY();
334  Assert(matview_maintenance_depth == old_depth);
335  }
336  else
337  {
338  refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
339 
340  /*
341  * Inform stats collector about our activity: basically, we truncated
342  * the matview and inserted some new data. (The concurrent code path
343  * above doesn't need to worry about this because the inserts and
344  * deletes it issues get counted by lower-level code.)
345  */
346  pgstat_count_truncate(matviewRel);
347  if (!stmt->skipData)
348  pgstat_count_heap_insert(matviewRel, processed);
349  }
350 
351  table_close(matviewRel, NoLock);
352 
353  /* Roll back any GUC changes */
354  AtEOXact_GUC(false, save_nestlevel);
355 
356  /* Restore userid and security context */
357  SetUserIdAndSecContext(save_userid, save_sec_context);
358 
359  ObjectAddressSet(address, RelationRelationId, matviewOid);
360 
361  /*
362  * Save the rowcount so that pg_stat_statements can track the total number
363  * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
364  * still don't display the rowcount in the command completion tag output,
365  * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
366  * command tag is left false in cmdtaglist.h. Otherwise, the change of
367  * completion tag output might break applications using it.
368  */
369  if (qc)
370  SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed);
371 
372  return address;
373 }
374 
375 /*
376  * refresh_matview_datafill
377  *
378  * Execute the given query, sending result rows to "dest" (which will
379  * insert them into the target matview).
380  *
381  * Returns number of rows inserted.
382  */
383 static uint64
385  const char *queryString)
386 {
387  List *rewritten;
388  PlannedStmt *plan;
389  QueryDesc *queryDesc;
390  Query *copied_query;
391  uint64 processed;
392 
393  /* Lock and rewrite, using a copy to preserve the original query. */
394  copied_query = copyObject(query);
395  AcquireRewriteLocks(copied_query, true, false);
396  rewritten = QueryRewrite(copied_query);
397 
398  /* SELECT should never rewrite to more or less than one SELECT query */
399  if (list_length(rewritten) != 1)
400  elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
401  query = (Query *) linitial(rewritten);
402 
403  /* Check for user-requested abort. */
405 
406  /* Plan the query which will generate data for the refresh. */
407  plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
408 
409  /*
410  * Use a snapshot with an updated command ID to ensure this query sees
411  * results of any previously executed queries. (This could only matter if
412  * the planner executed an allegedly-stable function that changed the
413  * database contents, but let's do it anyway to be safe.)
414  */
417 
418  /* Create a QueryDesc, redirecting output to our tuple receiver */
419  queryDesc = CreateQueryDesc(plan, queryString,
421  dest, NULL, NULL, 0);
422 
423  /* call ExecutorStart to prepare the plan for execution */
424  ExecutorStart(queryDesc, 0);
425 
426  /* run the plan */
427  ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
428 
429  processed = queryDesc->estate->es_processed;
430 
431  /* and clean up */
432  ExecutorFinish(queryDesc);
433  ExecutorEnd(queryDesc);
434 
435  FreeQueryDesc(queryDesc);
436 
438 
439  return processed;
440 }
441 
442 DestReceiver *
444 {
446 
447  self->pub.receiveSlot = transientrel_receive;
448  self->pub.rStartup = transientrel_startup;
449  self->pub.rShutdown = transientrel_shutdown;
450  self->pub.rDestroy = transientrel_destroy;
451  self->pub.mydest = DestTransientRel;
452  self->transientoid = transientoid;
453 
454  return (DestReceiver *) self;
455 }
456 
457 /*
458  * transientrel_startup --- executor startup
459  */
460 static void
461 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
462 {
463  DR_transientrel *myState = (DR_transientrel *) self;
464  Relation transientrel;
465 
466  transientrel = table_open(myState->transientoid, NoLock);
467 
468  /*
469  * Fill private fields of myState for use by later routines
470  */
471  myState->transientrel = transientrel;
472  myState->output_cid = GetCurrentCommandId(true);
474  myState->bistate = GetBulkInsertState();
475 
476  /*
477  * Valid smgr_targblock implies something already wrote to the relation.
478  * This may be harmless, but this function hasn't planned for it.
479  */
481 }
482 
483 /*
484  * transientrel_receive --- receive one tuple
485  */
486 static bool
488 {
489  DR_transientrel *myState = (DR_transientrel *) self;
490 
491  /*
492  * Note that the input slot might not be of the type of the target
493  * relation. That's supported by table_tuple_insert(), but slightly less
494  * efficient than inserting with the right slot - but the alternative
495  * would be to copy into a slot of the right type, which would not be
496  * cheap either. This also doesn't allow accessing per-AM data (say a
497  * tuple's xmin), but since we don't do that here...
498  */
499 
501  slot,
502  myState->output_cid,
503  myState->ti_options,
504  myState->bistate);
505 
506  /* We know this is a newly created relation, so there are no indexes */
507 
508  return true;
509 }
510 
511 /*
512  * transientrel_shutdown --- executor end
513  */
514 static void
516 {
517  DR_transientrel *myState = (DR_transientrel *) self;
518 
519  FreeBulkInsertState(myState->bistate);
520 
522 
523  /* close transientrel, but keep lock until commit */
524  table_close(myState->transientrel, NoLock);
525  myState->transientrel = NULL;
526 }
527 
528 /*
529  * transientrel_destroy --- release DestReceiver object
530  */
531 static void
533 {
534  pfree(self);
535 }
536 
537 
538 /*
539  * Given a qualified temporary table name, append an underscore followed by
540  * the given integer, to make a new table name based on the old one.
541  * The result is a palloc'd string.
542  *
543  * As coded, this would fail to make a valid SQL name if the given name were,
544  * say, "FOO"."BAR". Currently, the table name portion of the input will
545  * never be double-quoted because it's of the form "pg_temp_NNN", cf
546  * make_new_heap(). But we might have to work harder someday.
547  */
548 static char *
549 make_temptable_name_n(char *tempname, int n)
550 {
551  StringInfoData namebuf;
552 
553  initStringInfo(&namebuf);
554  appendStringInfoString(&namebuf, tempname);
555  appendStringInfo(&namebuf, "_%d", n);
556  return namebuf.data;
557 }
558 
559 /*
560  * refresh_by_match_merge
561  *
562  * Refresh a materialized view with transactional semantics, while allowing
563  * concurrent reads.
564  *
565  * This is called after a new version of the data has been created in a
566  * temporary table. It performs a full outer join against the old version of
567  * the data, producing "diff" results. This join cannot work if there are any
568  * duplicated rows in either the old or new versions, in the sense that every
569  * column would compare as equal between the two rows. It does work correctly
570  * in the face of rows which have at least one NULL value, with all non-NULL
571  * columns equal. The behavior of NULLs on equality tests and on UNIQUE
572  * indexes turns out to be quite convenient here; the tests we need to make
573  * are consistent with default behavior. If there is at least one UNIQUE
574  * index on the materialized view, we have exactly the guarantee we need.
575  *
576  * The temporary table used to hold the diff results contains just the TID of
577  * the old record (if matched) and the ROW from the new table as a single
578  * column of complex record type (if matched).
579  *
580  * Once we have the diff table, we perform set-based DELETE and INSERT
581  * operations against the materialized view, and discard both temporary
582  * tables.
583  *
584  * Everything from the generation of the new data to applying the differences
585  * takes place under cover of an ExclusiveLock, since it seems as though we
586  * would want to prohibit not only concurrent REFRESH operations, but also
587  * incremental maintenance. It also doesn't seem reasonable or safe to allow
588  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
589  * this command.
590  */
591 static void
592 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
593  int save_sec_context)
594 {
595  StringInfoData querybuf;
596  Relation matviewRel;
597  Relation tempRel;
598  char *matviewname;
599  char *tempname;
600  char *diffname;
601  TupleDesc tupdesc;
602  bool foundUniqueIndex;
603  List *indexoidlist;
604  ListCell *indexoidscan;
605  int16 relnatts;
606  Oid *opUsedForQual;
607 
608  initStringInfo(&querybuf);
609  matviewRel = table_open(matviewOid, NoLock);
611  RelationGetRelationName(matviewRel));
612  tempRel = table_open(tempOid, NoLock);
614  RelationGetRelationName(tempRel));
615  diffname = make_temptable_name_n(tempname, 2);
616 
617  relnatts = RelationGetNumberOfAttributes(matviewRel);
618 
619  /* Open SPI context. */
620  if (SPI_connect() != SPI_OK_CONNECT)
621  elog(ERROR, "SPI_connect failed");
622 
623  /* Analyze the temp table with the new contents. */
624  appendStringInfo(&querybuf, "ANALYZE %s", tempname);
625  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
626  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
627 
628  /*
629  * We need to ensure that there are not duplicate rows without NULLs in
630  * the new data set before we can count on the "diff" results. Check for
631  * that in a way that allows showing the first duplicated row found. Even
632  * after we pass this test, a unique index on the materialized view may
633  * find a duplicate key problem.
634  *
635  * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
636  * keep ".*" from being expanded into multiple columns in a SELECT list.
637  * Compare ruleutils.c's get_variable().
638  */
639  resetStringInfo(&querybuf);
640  appendStringInfo(&querybuf,
641  "SELECT newdata.*::%s FROM %s newdata "
642  "WHERE newdata.* IS NOT NULL AND EXISTS "
643  "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
644  "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
645  "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
646  "newdata.ctid)",
647  tempname, tempname, tempname);
648  if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
649  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
650  if (SPI_processed > 0)
651  {
652  /*
653  * Note that this ereport() is returning data to the user. Generally,
654  * we would want to make sure that the user has been granted access to
655  * this data. However, REFRESH MAT VIEW is only able to be run by the
656  * owner of the mat view (or a superuser) and therefore there is no
657  * need to check for access to data in the mat view.
658  */
659  ereport(ERROR,
660  (errcode(ERRCODE_CARDINALITY_VIOLATION),
661  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
662  RelationGetRelationName(matviewRel)),
663  errdetail("Row: %s",
665  }
666 
667  SetUserIdAndSecContext(relowner,
668  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
669 
670  /* Start building the query for creating the diff table. */
671  resetStringInfo(&querybuf);
672  appendStringInfo(&querybuf,
673  "CREATE TEMP TABLE %s AS "
674  "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
675  "FROM %s mv FULL JOIN %s newdata ON (",
676  diffname, tempname, matviewname, tempname);
677 
678  /*
679  * Get the list of index OIDs for the table from the relcache, and look up
680  * each one in the pg_index syscache. We will test for equality on all
681  * columns present in all unique indexes which only reference columns and
682  * include all rows.
683  */
684  tupdesc = matviewRel->rd_att;
685  opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
686  foundUniqueIndex = false;
687 
688  indexoidlist = RelationGetIndexList(matviewRel);
689 
690  foreach(indexoidscan, indexoidlist)
691  {
692  Oid indexoid = lfirst_oid(indexoidscan);
693  Relation indexRel;
694 
695  indexRel = index_open(indexoid, RowExclusiveLock);
696  if (is_usable_unique_index(indexRel))
697  {
698  Form_pg_index indexStruct = indexRel->rd_index;
699  int indnkeyatts = indexStruct->indnkeyatts;
700  oidvector *indclass;
701  Datum indclassDatum;
702  bool isnull;
703  int i;
704 
705  /* Must get indclass the hard way. */
706  indclassDatum = SysCacheGetAttr(INDEXRELID,
707  indexRel->rd_indextuple,
708  Anum_pg_index_indclass,
709  &isnull);
710  Assert(!isnull);
711  indclass = (oidvector *) DatumGetPointer(indclassDatum);
712 
713  /* Add quals for all columns from this index. */
714  for (i = 0; i < indnkeyatts; i++)
715  {
716  int attnum = indexStruct->indkey.values[i];
717  Oid opclass = indclass->values[i];
718  Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
719  Oid attrtype = attr->atttypid;
720  HeapTuple cla_ht;
721  Form_pg_opclass cla_tup;
722  Oid opfamily;
723  Oid opcintype;
724  Oid op;
725  const char *leftop;
726  const char *rightop;
727 
728  /*
729  * Identify the equality operator associated with this index
730  * column. First we need to look up the column's opclass.
731  */
732  cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
733  if (!HeapTupleIsValid(cla_ht))
734  elog(ERROR, "cache lookup failed for opclass %u", opclass);
735  cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
736  Assert(cla_tup->opcmethod == BTREE_AM_OID);
737  opfamily = cla_tup->opcfamily;
738  opcintype = cla_tup->opcintype;
739  ReleaseSysCache(cla_ht);
740 
741  op = get_opfamily_member(opfamily, opcintype, opcintype,
743  if (!OidIsValid(op))
744  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
745  BTEqualStrategyNumber, opcintype, opcintype, opfamily);
746 
747  /*
748  * If we find the same column with the same equality semantics
749  * in more than one index, we only need to emit the equality
750  * clause once.
751  *
752  * Since we only remember the last equality operator, this
753  * code could be fooled into emitting duplicate clauses given
754  * multiple indexes with several different opclasses ... but
755  * that's so unlikely it doesn't seem worth spending extra
756  * code to avoid.
757  */
758  if (opUsedForQual[attnum - 1] == op)
759  continue;
760  opUsedForQual[attnum - 1] = op;
761 
762  /*
763  * Actually add the qual, ANDed with any others.
764  */
765  if (foundUniqueIndex)
766  appendStringInfoString(&querybuf, " AND ");
767 
768  leftop = quote_qualified_identifier("newdata",
769  NameStr(attr->attname));
770  rightop = quote_qualified_identifier("mv",
771  NameStr(attr->attname));
772 
773  generate_operator_clause(&querybuf,
774  leftop, attrtype,
775  op,
776  rightop, attrtype);
777 
778  foundUniqueIndex = true;
779  }
780  }
781 
782  /* Keep the locks, since we're about to run DML which needs them. */
783  index_close(indexRel, NoLock);
784  }
785 
786  list_free(indexoidlist);
787 
788  /*
789  * There must be at least one usable unique index on the matview.
790  *
791  * ExecRefreshMatView() checks that after taking the exclusive lock on the
792  * matview. So at least one unique index is guaranteed to exist here
793  * because the lock is still being held; so an Assert seems sufficient.
794  */
795  Assert(foundUniqueIndex);
796 
797  appendStringInfoString(&querybuf,
798  " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
799  "WHERE newdata.* IS NULL OR mv.* IS NULL "
800  "ORDER BY tid");
801 
802  /* Create the temporary "diff" table. */
803  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
804  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
805 
806  SetUserIdAndSecContext(relowner,
807  save_sec_context | SECURITY_RESTRICTED_OPERATION);
808 
809  /*
810  * We have no further use for data from the "full-data" temp table, but we
811  * must keep it around because its type is referenced from the diff table.
812  */
813 
814  /* Analyze the diff table. */
815  resetStringInfo(&querybuf);
816  appendStringInfo(&querybuf, "ANALYZE %s", diffname);
817  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
818  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
819 
821 
822  /* Deletes must come before inserts; do them first. */
823  resetStringInfo(&querybuf);
824  appendStringInfo(&querybuf,
825  "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
826  "(SELECT diff.tid FROM %s diff "
827  "WHERE diff.tid IS NOT NULL "
828  "AND diff.newdata IS NULL)",
829  matviewname, diffname);
830  if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
831  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
832 
833  /* Inserts go last. */
834  resetStringInfo(&querybuf);
835  appendStringInfo(&querybuf,
836  "INSERT INTO %s SELECT (diff.newdata).* "
837  "FROM %s diff WHERE tid IS NULL",
838  matviewname, diffname);
839  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
840  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
841 
842  /* We're done maintaining the materialized view. */
844  table_close(tempRel, NoLock);
845  table_close(matviewRel, NoLock);
846 
847  /* Clean up temp tables. */
848  resetStringInfo(&querybuf);
849  appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
850  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
851  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
852 
853  /* Close SPI context. */
854  if (SPI_finish() != SPI_OK_FINISH)
855  elog(ERROR, "SPI_finish failed");
856 }
857 
858 /*
859  * Swap the physical files of the target and transient tables, then rebuild
860  * the target's indexes and throw away the transient table. Security context
861  * swapping is handled by the called function, so it is not needed here.
862  */
863 static void
864 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
865 {
866  finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
867  RecentXmin, ReadNextMultiXactId(), relpersistence);
868 }
869 
870 /*
871  * Check whether specified index is usable for match merge.
872  */
873 static bool
875 {
876  Form_pg_index indexStruct = indexRel->rd_index;
877 
878  /*
879  * Must be unique, valid, immediate, non-partial, and be defined over
880  * plain user columns (not expressions). We also require it to be a
881  * btree. Even if we had any other unique index kinds, we'd not know how
882  * to identify the corresponding equality operator, nor could we be sure
883  * that the planner could implement the required FULL JOIN with non-btree
884  * operators.
885  */
886  if (indexStruct->indisunique &&
887  indexStruct->indimmediate &&
888  indexRel->rd_rel->relam == BTREE_AM_OID &&
889  indexStruct->indisvalid &&
890  RelationGetIndexPredicate(indexRel) == NIL &&
891  indexStruct->indnatts > 0)
892  {
893  /*
894  * The point of groveling through the index columns individually is to
895  * reject both index expressions and system columns. Currently,
896  * matviews couldn't have OID columns so there's no way to create an
897  * index on a system column; but maybe someday that wouldn't be true,
898  * so let's be safe.
899  */
900  int numatts = indexStruct->indnatts;
901  int i;
902 
903  for (i = 0; i < numatts; i++)
904  {
905  int attnum = indexStruct->indkey.values[i];
906 
907  if (attnum <= 0)
908  return false;
909  }
910  return true;
911  }
912  return false;
913 }
914 
915 
916 /*
917  * This should be used to test whether the backend is in a context where it is
918  * OK to allow DML statements to modify materialized views. We only want to
919  * allow that for internal code driven by the materialized view definition,
920  * not for arbitrary user-supplied code.
921  *
922  * While the function names reflect the fact that their main intended use is
923  * incremental maintenance of materialized views (in response to changes to
924  * the data in referenced relations), they are initially used to allow REFRESH
925  * without blocking concurrent reads.
926  */
927 bool
929 {
930  return matview_maintenance_depth > 0;
931 }
932 
933 static void
935 {
937 }
938 
939 static void
941 {
944 }
Oid GetDefaultTablespace(char relpersistence, bool partitioned)
Definition: tablespace.c:1182
#define InvalidBlockNumber
Definition: block.h:33
#define NameStr(name)
Definition: c.h:681
signed short int16
Definition: c.h:428
uint32 CommandId
Definition: c.h:601
#define OidIsValid(objectId)
Definition: c.h:710
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition: cluster.c:1362
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition: cluster.c:631
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
@ DestTransientRel
Definition: dest.h:99
int errdetail(const char *fmt,...)
Definition: elog.c:1037
int errhint(const char *fmt,...)
Definition: elog.c:1151
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define PG_RE_THROW()
Definition: elog.h:340
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define PG_CATCH()
Definition: elog.h:309
#define ereport(elevel,...)
Definition: elog.h:143
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:459
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:399
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:130
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:298
int NewGUCNestLevel(void)
Definition: guc.c:6259
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:6273
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2007
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2021
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free(List *list)
Definition: list.c:1505
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:164
static void transientrel_destroy(DestReceiver *self)
Definition: matview.c:532
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: matview.c:461
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString)
Definition: matview.c:384
static char * make_temptable_name_n(char *tempname, int n)
Definition: matview.c:549
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context)
Definition: matview.c:592
DestReceiver * CreateTransientRelDestReceiver(Oid transientoid)
Definition: matview.c:443
static bool is_usable_unique_index(Relation indexRel)
Definition: matview.c:874
bool MatViewIncrementalMaintenanceIsEnabled(void)
Definition: matview.c:928
static void CloseMatViewIncrementalMaintenance(void)
Definition: matview.c:940
static void OpenMatViewIncrementalMaintenance(void)
Definition: matview.c:934
void SetMatViewPopulatedState(Relation relation, bool newstate)
Definition: matview.c:84
static int matview_maintenance_depth
Definition: matview.c:61
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
Definition: matview.c:864
ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, QueryCompletion *qc)
Definition: matview.c:138
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: matview.c:487
static void transientrel_shutdown(DestReceiver *self)
Definition: matview.c:515
void pfree(void *pointer)
Definition: mcxt.c:1169
void * palloc0(Size size)
Definition: mcxt.c:1093
#define SECURITY_RESTRICTED_OPERATION
Definition: miscadmin.h:312
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
#define SECURITY_LOCAL_USERID_CHANGE
Definition: miscadmin.h:311
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition: miscinit.c:600
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition: miscinit.c:607
MultiXactId ReadNextMultiXactId(void)
Definition: multixact.c:723
Oid RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, uint32 flags, RangeVarGetRelidCallback callback, void *callback_arg)
Definition: namespace.c:237
#define copyObject(obj)
Definition: nodes.h:655
@ CMD_SELECT
Definition: nodes.h:686
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define CURSOR_OPT_PARALLEL_OK
Definition: parsenodes.h:2830
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
FormData_pg_index * Form_pg_index
Definition: pg_index.h:69
static int list_length(const List *l)
Definition: pg_list.h:149
#define linitial_node(type, l)
Definition: pg_list.h:177
#define NIL
Definition: pg_list.h:65
#define linitial(l)
Definition: pg_list.h:174
#define lfirst_oid(lc)
Definition: pg_list.h:171
FormData_pg_opclass * Form_pg_opclass
Definition: pg_opclass.h:83
void pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
Definition: pgstat.c:2358
void pgstat_count_truncate(Relation rel)
Definition: pgstat.c:2462
PlannedStmt * pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, ParamListInfo boundParams)
Definition: postgres.c:830
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetPointer(X)
Definition: postgres.h:593
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
unsigned int Oid
Definition: postgres_ext.h:31
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:67
static struct state * newstate(struct nfa *nfa)
Definition: regc_nfa.c:137
#define RelationGetRelid(relation)
Definition: rel.h:478
#define RelationGetNumberOfAttributes(relation)
Definition: rel.h:484
#define RelationGetRelationName(relation)
Definition: rel.h:512
#define RelationGetTargetBlock(relation)
Definition: rel.h:575
#define RelationIsPopulated(relation)
Definition: rel.h:651
#define RelationGetNamespace(relation)
Definition: rel.h:519
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4649
List * RelationGetIndexPredicate(Relation relation)
Definition: relcache.c:4996
void AcquireRewriteLocks(Query *parsetree, bool forExecute, bool forUpdatePushedDown)
List * QueryRewrite(Query *parsetree)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11524
void generate_operator_clause(StringInfo buf, const char *leftop, Oid leftoptype, Oid opoid, const char *rightop, Oid rightoptype)
Definition: ruleutils.c:11848
@ ForwardScanDirection
Definition: sdir.h:26
TransactionId RecentXmin
Definition: snapmgr.c:113
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:743
void PopActiveSnapshot(void)
Definition: snapmgr.c:774
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:731
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:801
#define InvalidSnapshot
Definition: snapshot.h:123
uint64 SPI_processed
Definition: spi.c:45
SPITupleTable * SPI_tuptable
Definition: spi.c:46
int SPI_connect(void)
Definition: spi.c:95
int SPI_finish(void)
Definition: spi.c:182
int SPI_exec(const char *src, long tcount)
Definition: spi.c:535
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:1125
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:501
#define SPI_OK_UTILITY
Definition: spi.h:85
#define SPI_OK_INSERT
Definition: spi.h:88
#define SPI_OK_DELETE
Definition: spi.h:89
#define SPI_OK_CONNECT
Definition: spi.h:82
#define SPI_OK_FINISH
Definition: spi.h:83
#define SPI_OK_SELECT
Definition: spi.h:86
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Relation transientrel
Definition: matview.c:55
BulkInsertState bistate
Definition: matview.c:58
DestReceiver pub
Definition: matview.c:52
CommandId output_cid
Definition: matview.c:56
Oid transientoid
Definition: matview.c:53
int ti_options
Definition: matview.c:57
uint64 es_processed
Definition: execnodes.h:609
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:51
EState * estate
Definition: execdesc.h:48
RangeVar * relation
Definition: parsenodes.h:3426
struct HeapTupleData * rd_indextuple
Definition: rel.h:190
TupleDesc rd_att
Definition: rel.h:110
Form_pg_index rd_index
Definition: rel.h:188
RuleLock * rd_rules
Definition: rel.h:113
Form_pg_class rd_rel
Definition: rel.h:109
RewriteRule ** rules
Definition: prs2lock.h:43
int numLocks
Definition: prs2lock.h:42
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: c.h:661
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:668
Definition: localtime.c:73
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1198
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1150
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1411
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:177
@ INDEXRELID
Definition: syscache.h:66
@ RELOID
Definition: syscache.h:87
@ CLAOID
Definition: syscache.h:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define TABLE_INSERT_FROZEN
Definition: tableam.h:237
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:236
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1566
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1373
void RangeVarCallbackOwnsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg)
Definition: tablecmds.c:16872
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition: tablecmds.c:3963
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void CommandCounterIncrement(void)
Definition: xact.c:1073
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:813