PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
matview.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  * materialized view support
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/multixact.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_operator.h"
25 #include "commands/cluster.h"
26 #include "commands/matview.h"
27 #include "commands/tablecmds.h"
28 #include "commands/tablespace.h"
29 #include "executor/executor.h"
30 #include "executor/spi.h"
31 #include "miscadmin.h"
32 #include "parser/parse_relation.h"
33 #include "pgstat.h"
34 #include "rewrite/rewriteHandler.h"
35 #include "storage/lmgr.h"
36 #include "storage/smgr.h"
37 #include "tcop/tcopprot.h"
38 #include "utils/builtins.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 #include "utils/snapmgr.h"
42 #include "utils/syscache.h"
43 #include "utils/typcache.h"
44 
45 
46 typedef struct
47 {
48  DestReceiver pub; /* publicly-known function pointers */
49  Oid transientoid; /* OID of new heap into which to store */
50  /* These fields are filled by transientrel_startup: */
51  Relation transientrel; /* relation to write to */
52  CommandId output_cid; /* cmin to insert in output tuples */
53  int hi_options; /* heap_insert performance options */
54  BulkInsertState bistate; /* bulk insert state */
56 
58 
59 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
60 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
61 static void transientrel_shutdown(DestReceiver *self);
62 static void transientrel_destroy(DestReceiver *self);
63 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
64  const char *queryString);
65 
66 static char *make_temptable_name_n(char *tempname, int n);
67 static void mv_GenerateOper(StringInfo buf, Oid opoid);
68 
69 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
70  int save_sec_context);
71 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
72 
73 static void OpenMatViewIncrementalMaintenance(void);
74 static void CloseMatViewIncrementalMaintenance(void);
75 
76 /*
77  * SetMatViewPopulatedState
78  * Mark a materialized view as populated, or not.
79  *
80  * NOTE: caller must be holding an appropriate lock on the relation.
81  */
82 void
84 {
85  Relation pgrel;
86  HeapTuple tuple;
87 
88  Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
89 
90  /*
91  * Update relation's pg_class entry. Crucial side-effect: other backends
92  * (and this one too!) are sent SI message to make them rebuild relcache
93  * entries.
94  */
98  if (!HeapTupleIsValid(tuple))
99  elog(ERROR, "cache lookup failed for relation %u",
100  RelationGetRelid(relation));
101 
102  ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
103 
104  CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
105 
106  heap_freetuple(tuple);
108 
109  /*
110  * Advance command counter to make the updated pg_class row locally
111  * visible.
112  */
114 }
115 
116 /*
117  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
118  *
119  * This refreshes the materialized view by creating a new table and swapping
120  * the relfilenodes of the new table and the old materialized view, so the OID
121  * of the original materialized view is preserved. Thus we do not lose GRANT
122  * nor references to this materialized view.
123  *
124  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
125  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
126  * statement associated with the materialized view. The statement node's
127  * skipData field shows whether the clause was used.
128  *
129  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
130  * the new heap, it's better to create the indexes afterwards than to fill them
131  * incrementally while we load.
132  *
133  * The matview's "populated" state is changed based on whether the contents
134  * reflect the result set of the materialized view's query.
135  */
137 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
138  ParamListInfo params, char *completionTag)
139 {
140  Oid matviewOid;
141  Relation matviewRel;
142  RewriteRule *rule;
143  List *actions;
144  Query *dataQuery;
145  Oid tableSpace;
146  Oid relowner;
147  Oid OIDNewHeap;
148  DestReceiver *dest;
149  uint64 processed = 0;
150  bool concurrent;
151  LOCKMODE lockmode;
152  char relpersistence;
153  Oid save_userid;
154  int save_sec_context;
155  int save_nestlevel;
156  ObjectAddress address;
157 
158  /* Determine strength of lock needed. */
159  concurrent = stmt->concurrent;
160  lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
161 
162  /*
163  * Get a lock until end of transaction.
164  */
165  matviewOid = RangeVarGetRelidExtended(stmt->relation,
166  lockmode, false, false,
168  matviewRel = heap_open(matviewOid, NoLock);
169 
170  /* Make sure it is a materialized view. */
171  if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
172  ereport(ERROR,
173  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
174  errmsg("\"%s\" is not a materialized view",
175  RelationGetRelationName(matviewRel))));
176 
177  /* Check that CONCURRENTLY is not specified if not populated. */
178  if (concurrent && !RelationIsPopulated(matviewRel))
179  ereport(ERROR,
180  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
182 
183  /* Check that conflicting options have not been specified. */
184  if (concurrent && stmt->skipData)
185  ereport(ERROR,
186  (errcode(ERRCODE_SYNTAX_ERROR),
187  errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
188 
189  /* We don't allow an oid column for a materialized view. */
190  Assert(!matviewRel->rd_rel->relhasoids);
191 
192  /*
193  * Check that everything is correct for a refresh. Problems at this point
194  * are internal errors, so elog is sufficient.
195  */
196  if (matviewRel->rd_rel->relhasrules == false ||
197  matviewRel->rd_rules->numLocks < 1)
198  elog(ERROR,
199  "materialized view \"%s\" is missing rewrite information",
200  RelationGetRelationName(matviewRel));
201 
202  if (matviewRel->rd_rules->numLocks > 1)
203  elog(ERROR,
204  "materialized view \"%s\" has too many rules",
205  RelationGetRelationName(matviewRel));
206 
207  rule = matviewRel->rd_rules->rules[0];
208  if (rule->event != CMD_SELECT || !(rule->isInstead))
209  elog(ERROR,
210  "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
211  RelationGetRelationName(matviewRel));
212 
213  actions = rule->actions;
214  if (list_length(actions) != 1)
215  elog(ERROR,
216  "the rule for materialized view \"%s\" is not a single action",
217  RelationGetRelationName(matviewRel));
218 
219  /*
220  * Check that there is a unique index with no WHERE clause on one or more
221  * columns of the materialized view if CONCURRENTLY is specified.
222  */
223  if (concurrent)
224  {
225  List *indexoidlist = RelationGetIndexList(matviewRel);
226  ListCell *indexoidscan;
227  bool hasUniqueIndex = false;
228 
229  foreach(indexoidscan, indexoidlist)
230  {
231  Oid indexoid = lfirst_oid(indexoidscan);
232  Relation indexRel;
233  Form_pg_index indexStruct;
234 
235  indexRel = index_open(indexoid, AccessShareLock);
236  indexStruct = indexRel->rd_index;
237 
238  if (indexStruct->indisunique &&
239  IndexIsValid(indexStruct) &&
240  RelationGetIndexExpressions(indexRel) == NIL &&
241  RelationGetIndexPredicate(indexRel) == NIL &&
242  indexStruct->indnatts > 0)
243  {
244  hasUniqueIndex = true;
245  index_close(indexRel, AccessShareLock);
246  break;
247  }
248 
249  index_close(indexRel, AccessShareLock);
250  }
251 
252  list_free(indexoidlist);
253 
254  if (!hasUniqueIndex)
255  ereport(ERROR,
256  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257  errmsg("cannot refresh materialized view \"%s\" concurrently",
259  RelationGetRelationName(matviewRel))),
260  errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
261  }
262 
263  /*
264  * The stored query was rewritten at the time of the MV definition, but
265  * has not been scribbled on by the planner.
266  */
267  dataQuery = castNode(Query, linitial(actions));
268 
269  /*
270  * Check for active uses of the relation in the current transaction, such
271  * as open scans.
272  *
273  * NB: We count on this to protect us against problems with refreshing the
274  * data using HEAP_INSERT_FROZEN.
275  */
276  CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
277 
278  /*
279  * Tentatively mark the matview as populated or not (this will roll back
280  * if we fail later).
281  */
282  SetMatViewPopulatedState(matviewRel, !stmt->skipData);
283 
284  relowner = matviewRel->rd_rel->relowner;
285 
286  /*
287  * Switch to the owner's userid, so that any functions are run as that
288  * user. Also arrange to make GUC variable changes local to this command.
289  * Don't lock it down too tight to create a temporary table just yet. We
290  * will switch modes when we are about to execute user code.
291  */
292  GetUserIdAndSecContext(&save_userid, &save_sec_context);
293  SetUserIdAndSecContext(relowner,
294  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
295  save_nestlevel = NewGUCNestLevel();
296 
297  /* Concurrent refresh builds new data in temp tablespace, and does diff. */
298  if (concurrent)
299  {
301  relpersistence = RELPERSISTENCE_TEMP;
302  }
303  else
304  {
305  tableSpace = matviewRel->rd_rel->reltablespace;
306  relpersistence = matviewRel->rd_rel->relpersistence;
307  }
308 
309  /*
310  * Create the transient table that will receive the regenerated data. Lock
311  * it against access by any other process until commit (by which time it
312  * will be gone).
313  */
314  OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
315  ExclusiveLock);
317  dest = CreateTransientRelDestReceiver(OIDNewHeap);
318 
319  /*
320  * Now lock down security-restricted operations.
321  */
322  SetUserIdAndSecContext(relowner,
323  save_sec_context | SECURITY_RESTRICTED_OPERATION);
324 
325  /* Generate the data, if wanted. */
326  if (!stmt->skipData)
327  processed = refresh_matview_datafill(dest, dataQuery, queryString);
328 
329  /* Make the matview match the newly generated data. */
330  if (concurrent)
331  {
332  int old_depth = matview_maintenance_depth;
333 
334  PG_TRY();
335  {
336  refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
337  save_sec_context);
338  }
339  PG_CATCH();
340  {
341  matview_maintenance_depth = old_depth;
342  PG_RE_THROW();
343  }
344  PG_END_TRY();
345  Assert(matview_maintenance_depth == old_depth);
346  }
347  else
348  {
349  refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
350 
351  /*
352  * Inform stats collector about our activity: basically, we truncated
353  * the matview and inserted some new data. (The concurrent code path
354  * above doesn't need to worry about this because the inserts and
355  * deletes it issues get counted by lower-level code.)
356  */
357  pgstat_count_truncate(matviewRel);
358  if (!stmt->skipData)
359  pgstat_count_heap_insert(matviewRel, processed);
360  }
361 
362  heap_close(matviewRel, NoLock);
363 
364  /* Roll back any GUC changes */
365  AtEOXact_GUC(false, save_nestlevel);
366 
367  /* Restore userid and security context */
368  SetUserIdAndSecContext(save_userid, save_sec_context);
369 
370  ObjectAddressSet(address, RelationRelationId, matviewOid);
371 
372  return address;
373 }
374 
375 /*
376  * refresh_matview_datafill
377  *
378  * Execute the given query, sending result rows to "dest" (which will
379  * insert them into the target matview).
380  *
381  * Returns number of rows inserted.
382  */
383 static uint64
385  const char *queryString)
386 {
387  List *rewritten;
388  PlannedStmt *plan;
389  QueryDesc *queryDesc;
390  Query *copied_query;
391  uint64 processed;
392 
393  /* Lock and rewrite, using a copy to preserve the original query. */
394  copied_query = copyObject(query);
395  AcquireRewriteLocks(copied_query, true, false);
396  rewritten = QueryRewrite(copied_query);
397 
398  /* SELECT should never rewrite to more or less than one SELECT query */
399  if (list_length(rewritten) != 1)
400  elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
401  query = (Query *) linitial(rewritten);
402 
403  /* Check for user-requested abort. */
405 
406  /* Plan the query which will generate data for the refresh. */
407  plan = pg_plan_query(query, 0, NULL);
408 
409  /*
410  * Use a snapshot with an updated command ID to ensure this query sees
411  * results of any previously executed queries. (This could only matter if
412  * the planner executed an allegedly-stable function that changed the
413  * database contents, but let's do it anyway to be safe.)
414  */
417 
418  /* Create a QueryDesc, redirecting output to our tuple receiver */
419  queryDesc = CreateQueryDesc(plan, queryString,
421  dest, NULL, 0);
422 
423  /* call ExecutorStart to prepare the plan for execution */
425 
426  /* run the plan */
427  ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
428 
429  processed = queryDesc->estate->es_processed;
430 
431  /* and clean up */
432  ExecutorFinish(queryDesc);
433  ExecutorEnd(queryDesc);
434 
435  FreeQueryDesc(queryDesc);
436 
438 
439  return processed;
440 }
441 
442 DestReceiver *
444 {
446 
447  self->pub.receiveSlot = transientrel_receive;
448  self->pub.rStartup = transientrel_startup;
449  self->pub.rShutdown = transientrel_shutdown;
450  self->pub.rDestroy = transientrel_destroy;
451  self->pub.mydest = DestTransientRel;
452  self->transientoid = transientoid;
453 
454  return (DestReceiver *) self;
455 }
456 
457 /*
458  * transientrel_startup --- executor startup
459  */
460 static void
461 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
462 {
463  DR_transientrel *myState = (DR_transientrel *) self;
464  Relation transientrel;
465 
466  transientrel = heap_open(myState->transientoid, NoLock);
467 
468  /*
469  * Fill private fields of myState for use by later routines
470  */
471  myState->transientrel = transientrel;
472  myState->output_cid = GetCurrentCommandId(true);
473 
474  /*
475  * We can skip WAL-logging the insertions, unless PITR or streaming
476  * replication is in use. We can skip the FSM in any case.
477  */
479  if (!XLogIsNeeded())
480  myState->hi_options |= HEAP_INSERT_SKIP_WAL;
481  myState->bistate = GetBulkInsertState();
482 
483  /* Not using WAL requires smgr_targblock be initially invalid */
485 }
486 
487 /*
488  * transientrel_receive --- receive one tuple
489  */
490 static bool
492 {
493  DR_transientrel *myState = (DR_transientrel *) self;
494  HeapTuple tuple;
495 
496  /*
497  * get the heap tuple out of the tuple table slot, making sure we have a
498  * writable copy
499  */
500  tuple = ExecMaterializeSlot(slot);
501 
502  heap_insert(myState->transientrel,
503  tuple,
504  myState->output_cid,
505  myState->hi_options,
506  myState->bistate);
507 
508  /* We know this is a newly created relation, so there are no indexes */
509 
510  return true;
511 }
512 
513 /*
514  * transientrel_shutdown --- executor end
515  */
516 static void
518 {
519  DR_transientrel *myState = (DR_transientrel *) self;
520 
521  FreeBulkInsertState(myState->bistate);
522 
523  /* If we skipped using WAL, must heap_sync before commit */
524  if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
525  heap_sync(myState->transientrel);
526 
527  /* close transientrel, but keep lock until commit */
528  heap_close(myState->transientrel, NoLock);
529  myState->transientrel = NULL;
530 }
531 
532 /*
533  * transientrel_destroy --- release DestReceiver object
534  */
535 static void
537 {
538  pfree(self);
539 }
540 
541 
542 /*
543  * Given a qualified temporary table name, append an underscore followed by
544  * the given integer, to make a new table name based on the old one.
545  *
546  * This leaks memory through palloc(), which won't be cleaned up until the
547  * current memory context is freed.
548  */
549 static char *
550 make_temptable_name_n(char *tempname, int n)
551 {
552  StringInfoData namebuf;
553 
554  initStringInfo(&namebuf);
555  appendStringInfoString(&namebuf, tempname);
556  appendStringInfo(&namebuf, "_%d", n);
557  return namebuf.data;
558 }
559 
560 static void
562 {
563  HeapTuple opertup;
564  Form_pg_operator operform;
565 
566  opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
567  if (!HeapTupleIsValid(opertup))
568  elog(ERROR, "cache lookup failed for operator %u", opoid);
569  operform = (Form_pg_operator) GETSTRUCT(opertup);
570  Assert(operform->oprkind == 'b');
571 
572  appendStringInfo(buf, "OPERATOR(%s.%s)",
573  quote_identifier(get_namespace_name(operform->oprnamespace)),
574  NameStr(operform->oprname));
575 
576  ReleaseSysCache(opertup);
577 }
578 
579 /*
580  * refresh_by_match_merge
581  *
582  * Refresh a materialized view with transactional semantics, while allowing
583  * concurrent reads.
584  *
585  * This is called after a new version of the data has been created in a
586  * temporary table. It performs a full outer join against the old version of
587  * the data, producing "diff" results. This join cannot work if there are any
588  * duplicated rows in either the old or new versions, in the sense that every
589  * column would compare as equal between the two rows. It does work correctly
590  * in the face of rows which have at least one NULL value, with all non-NULL
591  * columns equal. The behavior of NULLs on equality tests and on UNIQUE
592  * indexes turns out to be quite convenient here; the tests we need to make
593  * are consistent with default behavior. If there is at least one UNIQUE
594  * index on the materialized view, we have exactly the guarantee we need.
595  *
596  * The temporary table used to hold the diff results contains just the TID of
597  * the old record (if matched) and the ROW from the new table as a single
598  * column of complex record type (if matched).
599  *
600  * Once we have the diff table, we perform set-based DELETE and INSERT
601  * operations against the materialized view, and discard both temporary
602  * tables.
603  *
604  * Everything from the generation of the new data to applying the differences
605  * takes place under cover of an ExclusiveLock, since it seems as though we
606  * would want to prohibit not only concurrent REFRESH operations, but also
607  * incremental maintenance. It also doesn't seem reasonable or safe to allow
608  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
609  * this command.
610  */
611 static void
612 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
613  int save_sec_context)
614 {
615  StringInfoData querybuf;
616  Relation matviewRel;
617  Relation tempRel;
618  char *matviewname;
619  char *tempname;
620  char *diffname;
621  TupleDesc tupdesc;
622  bool foundUniqueIndex;
623  List *indexoidlist;
624  ListCell *indexoidscan;
625  int16 relnatts;
626  bool *usedForQual;
627 
628  initStringInfo(&querybuf);
629  matviewRel = heap_open(matviewOid, NoLock);
631  RelationGetRelationName(matviewRel));
632  tempRel = heap_open(tempOid, NoLock);
634  RelationGetRelationName(tempRel));
635  diffname = make_temptable_name_n(tempname, 2);
636 
637  relnatts = matviewRel->rd_rel->relnatts;
638  usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
639 
640  /* Open SPI context. */
641  if (SPI_connect() != SPI_OK_CONNECT)
642  elog(ERROR, "SPI_connect failed");
643 
644  /* Analyze the temp table with the new contents. */
645  appendStringInfo(&querybuf, "ANALYZE %s", tempname);
646  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
647  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
648 
649  /*
650  * We need to ensure that there are not duplicate rows without NULLs in
651  * the new data set before we can count on the "diff" results. Check for
652  * that in a way that allows showing the first duplicated row found. Even
653  * after we pass this test, a unique index on the materialized view may
654  * find a duplicate key problem.
655  */
656  resetStringInfo(&querybuf);
657  appendStringInfo(&querybuf,
658  "SELECT newdata FROM %s newdata "
659  "WHERE newdata IS NOT NULL AND EXISTS "
660  "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
661  "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
662  "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
663  "newdata.ctid) LIMIT 1",
664  tempname, tempname);
665  if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
666  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
667  if (SPI_processed > 0)
668  {
669  /*
670  * Note that this ereport() is returning data to the user. Generally,
671  * we would want to make sure that the user has been granted access to
672  * this data. However, REFRESH MAT VIEW is only able to be run by the
673  * owner of the mat view (or a superuser) and therefore there is no
674  * need to check for access to data in the mat view.
675  */
676  ereport(ERROR,
677  (errcode(ERRCODE_CARDINALITY_VIOLATION),
678  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
679  RelationGetRelationName(matviewRel)),
680  errdetail("Row: %s",
682  }
683 
684  SetUserIdAndSecContext(relowner,
685  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
686 
687  /* Start building the query for creating the diff table. */
688  resetStringInfo(&querybuf);
689  appendStringInfo(&querybuf,
690  "CREATE TEMP TABLE %s AS "
691  "SELECT mv.ctid AS tid, newdata "
692  "FROM %s mv FULL JOIN %s newdata ON (",
693  diffname, matviewname, tempname);
694 
695  /*
696  * Get the list of index OIDs for the table from the relcache, and look up
697  * each one in the pg_index syscache. We will test for equality on all
698  * columns present in all unique indexes which only reference columns and
699  * include all rows.
700  */
701  tupdesc = matviewRel->rd_att;
702  foundUniqueIndex = false;
703  indexoidlist = RelationGetIndexList(matviewRel);
704 
705  foreach(indexoidscan, indexoidlist)
706  {
707  Oid indexoid = lfirst_oid(indexoidscan);
708  Relation indexRel;
709  Form_pg_index indexStruct;
710 
711  indexRel = index_open(indexoid, RowExclusiveLock);
712  indexStruct = indexRel->rd_index;
713 
714  /*
715  * We're only interested if it is unique, valid, contains no
716  * expressions, and is not partial.
717  */
718  if (indexStruct->indisunique &&
719  IndexIsValid(indexStruct) &&
720  RelationGetIndexExpressions(indexRel) == NIL &&
721  RelationGetIndexPredicate(indexRel) == NIL)
722  {
723  int numatts = indexStruct->indnatts;
724  int i;
725 
726  /* Add quals for all columns from this index. */
727  for (i = 0; i < numatts; i++)
728  {
729  int attnum = indexStruct->indkey.values[i];
730  Oid type;
731  Oid op;
732  const char *colname;
733 
734  /*
735  * Only include the column once regardless of how many times
736  * it shows up in how many indexes.
737  */
738  if (usedForQual[attnum - 1])
739  continue;
740  usedForQual[attnum - 1] = true;
741 
742  /*
743  * Actually add the qual, ANDed with any others.
744  */
745  if (foundUniqueIndex)
746  appendStringInfoString(&querybuf, " AND ");
747 
748  colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
749  appendStringInfo(&querybuf, "newdata.%s ", colname);
750  type = attnumTypeId(matviewRel, attnum);
752  mv_GenerateOper(&querybuf, op);
753  appendStringInfo(&querybuf, " mv.%s", colname);
754 
755  foundUniqueIndex = true;
756  }
757  }
758 
759  /* Keep the locks, since we're about to run DML which needs them. */
760  index_close(indexRel, NoLock);
761  }
762 
763  list_free(indexoidlist);
764 
765  /*
766  * There must be at least one unique index on the matview.
767  *
768  * ExecRefreshMatView() checks that after taking the exclusive lock on the
769  * matview. So at least one unique index is guaranteed to exist here
770  * because the lock is still being held.
771  */
772  Assert(foundUniqueIndex);
773 
774  appendStringInfoString(&querybuf,
775  " AND newdata OPERATOR(pg_catalog.*=) mv) "
776  "WHERE newdata IS NULL OR mv IS NULL "
777  "ORDER BY tid");
778 
779  /* Create the temporary "diff" table. */
780  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
781  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
782 
783  SetUserIdAndSecContext(relowner,
784  save_sec_context | SECURITY_RESTRICTED_OPERATION);
785 
786  /*
787  * We have no further use for data from the "full-data" temp table, but we
788  * must keep it around because its type is referenced from the diff table.
789  */
790 
791  /* Analyze the diff table. */
792  resetStringInfo(&querybuf);
793  appendStringInfo(&querybuf, "ANALYZE %s", diffname);
794  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
795  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
796 
798 
799  /* Deletes must come before inserts; do them first. */
800  resetStringInfo(&querybuf);
801  appendStringInfo(&querybuf,
802  "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
803  "(SELECT diff.tid FROM %s diff "
804  "WHERE diff.tid IS NOT NULL "
805  "AND diff.newdata IS NULL)",
806  matviewname, diffname);
807  if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
808  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
809 
810  /* Inserts go last. */
811  resetStringInfo(&querybuf);
812  appendStringInfo(&querybuf,
813  "INSERT INTO %s SELECT (diff.newdata).* "
814  "FROM %s diff WHERE tid IS NULL",
815  matviewname, diffname);
816  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
817  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
818 
819  /* We're done maintaining the materialized view. */
821  heap_close(tempRel, NoLock);
822  heap_close(matviewRel, NoLock);
823 
824  /* Clean up temp tables. */
825  resetStringInfo(&querybuf);
826  appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
827  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
828  elog(ERROR, "SPI_exec failed: %s", querybuf.data);
829 
830  /* Close SPI context. */
831  if (SPI_finish() != SPI_OK_FINISH)
832  elog(ERROR, "SPI_finish failed");
833 }
834 
835 /*
836  * Swap the physical files of the target and transient tables, then rebuild
837  * the target's indexes and throw away the transient table. Security context
838  * swapping is handled by the called function, so it is not needed here.
839  */
840 static void
841 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
842 {
843  finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
844  RecentXmin, ReadNextMultiXactId(), relpersistence);
845 }
846 
847 
848 /*
849  * This should be used to test whether the backend is in a context where it is
850  * OK to allow DML statements to modify materialized views. We only want to
851  * allow that for internal code driven by the materialized view definition,
852  * not for arbitrary user-supplied code.
853  *
854  * While the function names reflect the fact that their main intended use is
855  * incremental maintenance of materialized views (in response to changes to
856  * the data in referenced relations), they are initially used to allow REFRESH
857  * without blocking concurrent reads.
858  */
859 bool
861 {
862  return matview_maintenance_depth > 0;
863 }
864 
865 static void
867 {
869 }
870 
871 static void
873 {
876 }
signed short int16
Definition: c.h:255
#define RelationIsPopulated(relation)
Definition: rel.h:553
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:411
#define SPI_OK_CONNECT
Definition: spi.h:47
void RangeVarCallbackOwnsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg)
Definition: tablecmds.c:12601
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:776
List * QueryRewrite(Query *parsetree)
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
int numLocks
Definition: prs2lock.h:42
EState * estate
Definition: execdesc.h:47
#define IndexIsValid(indexForm)
Definition: pg_index.h:107
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition: cluster.c:1469
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10185
#define SECURITY_RESTRICTED_OPERATION
Definition: miscadmin.h:292
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition: miscinit.c:395
DestReceiver pub
Definition: matview.c:48
int LOCKMODE
Definition: lockdefs.h:26
#define HEAP_INSERT_FROZEN
Definition: heapam.h:30
int SPI_connect(void)
Definition: spi.c:84
#define castNode(_type_, nodeptr)
Definition: nodes.h:575
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:103
Oid RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, bool nowait, RangeVarGetRelidCallback callback, void *callback_arg)
Definition: namespace.c:217
#define ExclusiveLock
Definition: lockdefs.h:44
Oid transientoid
Definition: matview.c:49
bool MatViewIncrementalMaintenanceIsEnabled(void)
Definition: matview.c:860
#define SPI_OK_DELETE
Definition: spi.h:54
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: matview.c:461
#define RelationRelationId
Definition: pg_class.h:29
#define XLogIsNeeded()
Definition: xlog.h:145
int SPI_finish(void)
Definition: spi.c:147
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:144
Form_pg_attribute * attrs
Definition: tupdesc.h:74
#define RELKIND_MATVIEW
Definition: pg_class.h:165
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:834
#define AccessShareLock
Definition: lockdefs.h:36
SPITupleTable * SPI_tuptable
Definition: spi.c:41
#define TYPECACHE_EQ_OPR
Definition: typcache.h:110
static void CloseMatViewIncrementalMaintenance(void)
Definition: matview.c:872
int errcode(int sqlerrcode)
Definition: elog.c:575
TransactionId RecentXmin
Definition: snapmgr.c:165
void heap_sync(Relation rel)
Definition: heapam.c:9138
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
#define heap_close(r, l)
Definition: heapam.h:97
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, int instrument_options)
Definition: pquery.c:66
Form_pg_class rd_rel
Definition: rel.h:114
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
HeapTuple * vals
Definition: spi.h:27
#define RelationGetTargetBlock(relation)
Definition: rel.h:489
bool isInstead
Definition: prs2lock.h:31
RangeVar * relation
Definition: parsenodes.h:3096
uint64 SPI_processed
Definition: spi.c:39
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:152
Oid GetDefaultTablespace(char relpersistence)
Definition: tablespace.c:1111
Definition: localtime.c:74
List * RelationGetIndexPredicate(Relation relation)
Definition: relcache.c:4753
void * copyObject(const void *from)
Definition: copyfuncs.c:4619
static char * make_temptable_name_n(char *tempname, int n)
Definition: matview.c:550
char * SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
Definition: spi.c:803
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:453
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2322
Form_pg_index rd_index
Definition: rel.h:159
void pfree(void *pointer)
Definition: mcxt.c:950
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define linitial(l)
Definition: pg_list.h:110
#define EXEC_FLAG_WITHOUT_OIDS
Definition: executor.h:64
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
int SPI_exec(const char *src, long tcount)
Definition: spi.c:331
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:291
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:764
ItemPointerData t_self
Definition: htup.h:65
static void transientrel_destroy(DestReceiver *self)
Definition: matview.c:536
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:189
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
#define NoLock
Definition: lockdefs.h:34
static char * buf
Definition: pg_test_fsync.c:65
List * RelationGetIndexExpressions(Relation relation)
Definition: relcache.c:4690
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition: miscinit.c:388
#define RowExclusiveLock
Definition: lockdefs.h:38
CmdType event
Definition: prs2lock.h:27
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition: guc.c:5058
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define SPI_OK_UTILITY
Definition: spi.h:50
static void transientrel_shutdown(DestReceiver *self)
Definition: matview.c:517
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: matview.c:491
Oid attnumTypeId(Relation rd, int attid)
#define RelationGetRelationName(relation)
Definition: rel.h:437
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
void pgstat_count_truncate(Relation rel)
Definition: pgstat.c:1924
RewriteRule ** rules
Definition: prs2lock.h:43
void AcquireRewriteLocks(Query *parsetree, bool forExecute, bool forUpdatePushedDown)
List * actions
Definition: prs2lock.h:29
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition: tablecmds.c:2970
#define ereport(elevel, rest)
Definition: elog.h:122
Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2399
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:393
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
Definition: matview.c:841
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:10271
FormData_pg_index * Form_pg_index
Definition: pg_index.h:67
#define InvalidSnapshot
Definition: snapshot.h:25
void * palloc0(Size size)
Definition: mcxt.c:878
void CommandCounterIncrement(void)
Definition: xact.c:922
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1116
static struct state * newstate(struct nfa *nfa)
Definition: regc_nfa.c:124
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
TupleDesc tupdesc
Definition: spi.h:26
#define SECURITY_LOCAL_USERID_CHANGE
Definition: miscadmin.h:291
TupleDesc rd_att
Definition: rel.h:115
Relation transientrel
Definition: matview.c:51
#define SPI_OK_SELECT
Definition: spi.h:51
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:191
int hi_options
Definition: matview.c:53
#define PG_CATCH()
Definition: elog.h:293
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2336
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString)
Definition: matview.c:384
uint64 es_processed
Definition: execnodes.h:441
RuleLock * rd_rules
Definition: rel.h:118
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
#define InvalidBlockNumber
Definition: block.h:33
static int list_length(const List *l)
Definition: pg_list.h:89
#define SPI_OK_FINISH
Definition: spi.h:48
static void OpenMatViewIncrementalMaintenance(void)
Definition: matview.c:866
#define PG_RE_THROW()
Definition: elog.h:314
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:725
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4337
FormData_pg_operator* Form_pg_operator
Definition: pg_operator.h:57
static void mv_GenerateOper(StringInfo buf, Oid opoid)
Definition: matview.c:561
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:176
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
FormData_pg_class * Form_pg_class
Definition: pg_class.h:95
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:161
#define AccessExclusiveLock
Definition: lockdefs.h:46
DestReceiver * CreateTransientRelDestReceiver(Oid transientoid)
Definition: matview.c:443
int NewGUCNestLevel(void)
Definition: guc.c:5044
void pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
Definition: pgstat.c:1823
int errmsg(const char *fmt,...)
Definition: elog.c:797
static int matview_maintenance_depth
Definition: matview.c:57
void list_free(List *list)
Definition: list.c:1133
int i
#define NameStr(name)
Definition: c.h:499
ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, char *completionTag)
Definition: matview.c:137
#define SPI_OK_INSERT
Definition: spi.h:53
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
CommandId output_cid
Definition: matview.c:52
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:687
#define elog
Definition: elog.h:219
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
#define RELPERSISTENCE_TEMP
Definition: pg_class.h:172
#define PG_TRY()
Definition: elog.h:284
Definition: pg_list.h:45
#define RelationGetRelid(relation)
Definition: rel.h:417
void SetMatViewPopulatedState(Relation relation, bool newstate)
Definition: matview.c:83
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:151
#define PG_END_TRY()
Definition: elog.h:300
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence, LOCKMODE lockmode)
Definition: cluster.c:608
#define lfirst_oid(lc)
Definition: pg_list.h:108
BulkInsertState bistate
Definition: matview.c:54
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context)
Definition: matview.c:612
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:303
MultiXactId ReadNextMultiXactId(void)
Definition: multixact.c:721
#define RelationGetNamespace(relation)
Definition: rel.h:444
PlannedStmt * pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams)
Definition: postgres.c:781