PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * repack.c
4 * REPACK a table; formerly known as CLUSTER. VACUUM FULL also uses
5 * parts of this code.
6 *
7 * There are two somewhat different ways to rewrite a table. In non-
8 * concurrent mode, it's easy: take AccessExclusiveLock, create a new
9 * transient relation, copy the tuples over to the relfilenode of the new
10 * relation, swap the relfilenodes, then drop the old relation.
11 *
12 * In concurrent mode, we lock the table with only ShareUpdateExclusiveLock,
13 * then do an initial copy as above. However, while the tuples are being
14 * copied, concurrent transactions could modify the table. To cope with those
15 * changes, we rely on logical decoding to obtain them from WAL. A bgworker
16 * consumes WAL while the initial copy is ongoing (to prevent excessive WAL
17 * from being reserved), and accumulates the changes in a file. Once the
18 * initial copy is complete, we read the changes from the file and re-apply
19 * them on the new heap. Then we upgrade our ShareUpdateExclusiveLock to
20 * AccessExclusiveLock and swap the relfilenodes. This way, the time we hold
21 * a strong lock on the table is much reduced, and the bloat is eliminated.
22 *
23 *
24 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
25 * Portions Copyright (c) 1994-5, Regents of the University of California
26 *
27 *
28 * IDENTIFICATION
29 * src/backend/commands/repack.c
30 *
31 *-------------------------------------------------------------------------
32 */
33#include "postgres.h"
34
35#include "access/amapi.h"
36#include "access/heapam.h"
37#include "access/multixact.h"
38#include "access/relscan.h"
39#include "access/tableam.h"
41#include "access/transam.h"
42#include "access/xact.h"
43#include "access/xlog.h"
44#include "catalog/catalog.h"
45#include "catalog/dependency.h"
46#include "catalog/heap.h"
47#include "catalog/index.h"
48#include "catalog/namespace.h"
50#include "catalog/pg_am.h"
52#include "catalog/pg_inherits.h"
53#include "catalog/toasting.h"
54#include "commands/defrem.h"
55#include "commands/progress.h"
56#include "commands/repack.h"
58#include "commands/tablecmds.h"
59#include "commands/vacuum.h"
60#include "executor/executor.h"
61#include "libpq/pqformat.h"
62#include "libpq/pqmq.h"
63#include "miscadmin.h"
64#include "optimizer/optimizer.h"
65#include "pgstat.h"
67#include "storage/bufmgr.h"
68#include "storage/ipc.h"
69#include "storage/lmgr.h"
70#include "storage/predicate.h"
71#include "storage/proc.h"
72#include "utils/acl.h"
73#include "utils/fmgroids.h"
74#include "utils/guc.h"
76#include "utils/inval.h"
77#include "utils/lsyscache.h"
78#include "utils/memutils.h"
79#include "utils/pg_rusage.h"
80#include "utils/relmapper.h"
81#include "utils/snapmgr.h"
82#include "utils/syscache.h"
83#include "utils/wait_event_types.h"
84
85/*
86 * This struct is used to pass around the information on tables to be
87 * clustered. We need this so we can make a list of them when invoked without
88 * a specific table/index pair.
89 */
90typedef struct
91{
95
96/*
97 * The first file exported by the decoding worker must contain a snapshot, the
98 * following ones contain the data changes.
99 */
100#define WORKER_FILE_SNAPSHOT 0
101
102/*
103 * Information needed to apply concurrent data changes.
104 */
105typedef struct ChangeContext
106{
107 /* The relation the changes are applied to. */
109
110 /* Needed to update indexes of cc_rel. */
113
114 /*
115 * Existing tuples to UPDATE and DELETE are located via this index. We
116 * keep the scankey in partially initialized state to avoid repeated work.
117 * sk_argument is completed on the fly.
118 */
122
123 /* The latest column we need to deform to have the tuple identity */
125
126 /* Sequential number of the file containing the changes. */
129
130/*
131 * Backend-local information to control the decoding worker.
132 */
133typedef struct DecodingWorker
134{
135 /* The worker. */
137
138 /* DecodingWorkerShared is in this segment. */
140
141 /* Handle of the error queue. */
144
145/* Pointer to currently running decoding worker. */
147
148/*
149 * Is there a message sent by a repack worker that the backend needs to
150 * receive?
151 */
153
154static LOCKMODE RepackLockLevel(bool concurrent);
156 Oid indexOid, Oid userid, LOCKMODE lmode,
157 int options);
161 Oid ident_idx);
163 Snapshot snapshot,
164 bool verbose,
168static List *get_tables_to_repack(RepackCommand cmd, bool usingindex,
171 Oid relid, bool rel_is_index,
174 Oid relid, Oid userid);
175
177static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
182static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
183static void restore_tuple(BufFile *file, Relation relation,
184 TupleTableSlot *slot);
185static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
186 TupleTableSlot *src);
188 TupleTableSlot *locator,
191 TupleTableSlot *locator,
193static void process_concurrent_changes(XLogRecPtr end_of_wal,
195 bool done);
197 Relation relation,
208 LOCKMODE lockmode,
209 bool isTopLevel,
210 ClusterParams *params);
211static Oid determine_clustered_index(Relation rel, bool usingindex,
212 const char *indexname);
213
214static void start_repack_decoding_worker(Oid relid);
215static void stop_repack_decoding_worker(void);
216static void stop_repack_decoding_worker_cb(int code, Datum arg);
218
219static void ProcessRepackMessage(StringInfo msg);
220static const char *RepackCommandAsString(RepackCommand cmd);
221
222
223/*
224 * The repack code allows for processing multiple tables at once. Because
225 * of this, we cannot just run everything on a single transaction, or we
226 * would be forced to acquire exclusive locks on all the tables being
227 * clustered, simultaneously --- very likely leading to deadlock.
228 *
229 * To solve this we follow a similar strategy to VACUUM code, processing each
230 * relation in a separate transaction. For this to work, we need to:
231 *
232 * - provide a separate memory context so that we can pass information in
233 * a way that survives across transactions
234 * - start a new transaction every time a new relation is clustered
235 * - check for validity of the information on to-be-clustered relations,
236 * as someone might have deleted a relation behind our back, or
237 * clustered one on a different index
238 * - end the transaction
239 *
240 * The single-relation case does not have any such overhead.
241 *
242 * We also allow a relation to be repacked following an index, but without
243 * naming a specific one. In that case, the indisclustered bit will be
244 * looked up, and an ERROR will be thrown if no so-marked index is found.
245 */
246void
248{
249 ClusterParams params = {0};
250 Relation rel = NULL;
252 LOCKMODE lockmode;
253 List *rtcs;
254 bool verbose = false;
255 bool analyze = false;
256 bool concurrently = false;
257
258 /* Parse option list */
259 foreach_node(DefElem, opt, stmt->params)
260 {
261 if (strcmp(opt->defname, "verbose") == 0)
262 verbose = defGetBoolean(opt);
263 else if (strcmp(opt->defname, "analyze") == 0 ||
264 strcmp(opt->defname, "analyse") == 0)
265 analyze = defGetBoolean(opt);
266 else if (strcmp(opt->defname, "concurrently") == 0)
267 {
268 if (stmt->command != REPACK_COMMAND_REPACK)
271 errmsg("CONCURRENTLY option not supported for %s",
272 RepackCommandAsString(stmt->command)));
274 }
275 else
278 errmsg("unrecognized %s option \"%s\"",
279 RepackCommandAsString(stmt->command),
280 opt->defname),
281 parser_errposition(pstate, opt->location));
282 }
283
284 params.options |=
285 (verbose ? CLUOPT_VERBOSE : 0) |
286 (analyze ? CLUOPT_ANALYZE : 0) |
288
289 /* Determine the lock mode to use. */
290 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
291
292 if ((params.options & CLUOPT_CONCURRENT) != 0)
293 {
294 /*
295 * Make sure we're not in a transaction block.
296 *
297 * The reason is that repack_setup_logical_decoding() could wait
298 * indefinitely for our XID to complete. (The deadlock detector would
299 * not recognize it because we'd be waiting for ourselves, i.e. no
300 * real lock conflict.) It would be possible to run in a transaction
301 * block if we had no XID, but this restriction is simpler for users
302 * to understand and we don't lose any functionality.
303 */
304 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
305 }
306
307 /*
308 * If a single relation is specified, process it and we're done ... unless
309 * the relation is a partitioned table, in which case we fall through.
310 */
311 if (stmt->relation != NULL)
312 {
313 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
314 if (rel == NULL)
315 return; /* all done */
316 }
317
318 /*
319 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
320 * can add support for this later.
321 */
322 if (params.options & CLUOPT_ANALYZE)
325 errmsg("cannot execute %s on multiple tables",
326 "REPACK (ANALYZE)"));
327
328 /*
329 * By here, we know we are in a multi-table situation.
330 *
331 * Concurrent processing is currently considered rather special (e.g. in
332 * terms of resources consumed) so it is not performed in bulk.
333 */
334 if (params.options & CLUOPT_CONCURRENT)
335 {
336 if (rel != NULL)
337 {
338 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
341 errmsg("%s is not supported for partitioned tables",
342 "REPACK (CONCURRENTLY)"),
343 errhint("Consider running the command on individual partitions."));
344 }
345 else
348 errmsg("%s requires an explicit table name",
349 "REPACK (CONCURRENTLY)"));
350 }
351
352 /*
353 * In order to avoid holding locks for too long, we want to process each
354 * table in its own transaction. This forces us to disallow running
355 * inside a user transaction block.
356 */
358
359 /* Also, we need a memory context to hold our list of relations */
361 "Repack",
363
364 /*
365 * Since we open a new transaction for each relation, we have to check
366 * that the relation still is what we think it is.
367 *
368 * In single-transaction CLUSTER, we don't need the overhead.
369 */
370 params.options |= CLUOPT_RECHECK;
371
372 /*
373 * If we don't have a relation yet, determine a relation list. If we do,
374 * then it must be a partitioned table, and we want to process its
375 * partitions.
376 */
377 if (rel == NULL)
378 {
379 Assert(stmt->indexname == NULL);
380 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
383 }
384 else
385 {
386 Oid relid;
387 bool rel_is_index;
388
389 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
390
391 /*
392 * If USING INDEX was specified, resolve the index name now and pass
393 * it down.
394 */
395 if (stmt->usingindex)
396 {
397 /*
398 * If no index name was specified when repacking a partitioned
399 * table, punt for now. Maybe we can improve this later.
400 */
401 if (!stmt->indexname)
402 {
403 if (stmt->command == REPACK_COMMAND_CLUSTER)
406 errmsg("there is no previously clustered index for table \"%s\"",
408 else
411 /*- translator: first %s is name of a SQL command, eg. REPACK */
412 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
413 RepackCommandAsString(stmt->command),
415 }
416
417 relid = determine_clustered_index(rel, stmt->usingindex,
418 stmt->indexname);
419 if (!OidIsValid(relid))
420 elog(ERROR, "unable to determine index to cluster on");
422
423 rel_is_index = true;
424 }
425 else
426 {
427 relid = RelationGetRelid(rel);
428 rel_is_index = false;
429 }
430
432 relid, rel_is_index,
434
435 /* close parent relation, releasing lock on it */
437 rel = NULL;
438 }
439
440 /* Commit to get out of starting transaction */
443
444 /* Cluster the tables, each in a separate transaction */
445 Assert(rel == NULL);
447 {
448 /* Start a new transaction for each relation. */
450
451 /*
452 * Open the target table, coping with the case where it has been
453 * dropped.
454 */
455 rel = try_table_open(rtc->tableOid, lockmode);
456 if (rel == NULL)
457 {
459 continue;
460 }
461
462 /* functions in indexes may want a snapshot set */
464
465 /* Process this table */
466 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
467 /* cluster_rel closes the relation, but keeps lock */
468
471 }
472
473 /* Start a new transaction for the cleanup work. */
475
476 /* Clean up working storage */
478}
479
480/*
481 * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
482 * operation to avoid any lock-upgrade hazards. In the concurrent case, we
483 * grab ShareUpdateExclusiveLock (just like VACUUM) for most of the
484 * processing and only acquire AccessExclusiveLock at the end, to swap the
485 * relation -- supposedly for a short time.
486 */
487static LOCKMODE
488RepackLockLevel(bool concurrent)
489{
490 if (concurrent)
492 else
493 return AccessExclusiveLock;
494}
495
496/*
497 * cluster_rel
498 *
499 * This clusters the table by creating a new, clustered table and
500 * swapping the relfilenumbers of the new table and the old table, so
501 * the OID of the original table is preserved. Thus we do not lose
502 * GRANT, inheritance nor references to this table.
503 *
504 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
505 * the new table, it's better to create the indexes afterwards than to fill
506 * them incrementally while we load the table.
507 *
508 * If indexOid is InvalidOid, the table will be rewritten in physical order
509 * instead of index order.
510 *
511 * Note that, in the concurrent case, the function releases the lock at some
512 * point, in order to get AccessExclusiveLock for the final steps (i.e. to
513 * swap the relation files). To make things simpler, the caller should expect
514 * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
515 * AccessExclusiveLock is kept till the end of the transaction.)
516 *
517 * 'cmd' indicates which command is being executed, to be used for error
518 * messages.
519 */
520void
522 ClusterParams *params, bool isTopLevel)
523{
524 Oid tableOid = RelationGetRelid(OldHeap);
527 Oid save_userid;
528 int save_sec_context;
529 int save_nestlevel;
530 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
531 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
532 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
534
535 /* Determine the lock mode to use. */
536 lmode = RepackLockLevel(concurrent);
537
538 /*
539 * Check some preconditions in the concurrent case. This also obtains the
540 * replica index OID.
541 */
542 if (concurrent)
544
545 /* Check for user-requested abort. */
547
550
551 /*
552 * Switch to the table owner's userid, so that any index functions are run
553 * as that user. Also lock down security-restricted operations and
554 * arrange to make GUC variable changes local to this command.
555 */
556 GetUserIdAndSecContext(&save_userid, &save_sec_context);
557 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
558 save_sec_context | SECURITY_RESTRICTED_OPERATION);
559 save_nestlevel = NewGUCNestLevel();
561
562 /*
563 * Recheck that the relation is still what it was when we started.
564 *
565 * Note that it's critical to skip this in single-relation CLUSTER;
566 * otherwise, we would reject an attempt to cluster using a
567 * not-previously-clustered index.
568 */
569 if (recheck &&
570 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
571 lmode, params->options))
572 goto out;
573
574 /*
575 * We allow repacking shared catalogs only when not using an index. It
576 * would work to use an index in most respects, but the index would only
577 * get marked as indisclustered in the current database, leading to
578 * unexpected behavior if CLUSTER were later invoked in another database.
579 */
580 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
583 /*- translator: first %s is name of a SQL command, eg. REPACK */
584 errmsg("cannot execute %s on a shared catalog",
586
587 /*
588 * The CONCURRENTLY case should have been rejected earlier because it does
589 * not support system catalogs.
590 */
591 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
592
593 /*
594 * Don't process temp tables of other backends ... their local buffer
595 * manager is not going to cope.
596 */
600 /*- translator: first %s is name of a SQL command, eg. REPACK */
601 errmsg("cannot execute %s on temporary tables of other sessions",
603
604 /*
605 * Also check for active uses of the relation in the current transaction,
606 * including open scans and pending AFTER trigger events.
607 */
609
610 /* Check heap and index are valid to cluster on */
611 if (OidIsValid(indexOid))
612 {
613 /* verify the index is good and lock it */
615 /* also open it */
616 index = index_open(indexOid, NoLock);
617 }
618 else
619 index = NULL;
620
621 /*
622 * When allow_system_table_mods is turned off, we disallow repacking a
623 * catalog on a particular index unless that's already the clustered index
624 * for that catalog.
625 *
626 * XXX We don't check for this in CLUSTER, because it's historically been
627 * allowed.
628 */
629 if (cmd != REPACK_COMMAND_CLUSTER &&
630 !allowSystemTableMods && OidIsValid(indexOid) &&
631 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
634 errmsg("permission denied: \"%s\" is a system catalog",
636 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
637 "allow_system_table_mods"));
638
639 /*
640 * Quietly ignore the request if this is a materialized view which has not
641 * been populated from its query. No harm is done because there is no data
642 * to deal with, and we don't want to throw an error if this is part of a
643 * multi-relation request -- for example, CLUSTER was run on the entire
644 * database.
645 */
646 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
648 {
649 if (index)
652 goto out;
653 }
654
655 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
656 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
657 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
658
659 /*
660 * All predicate locks on the tuples or pages are about to be made
661 * invalid, because we move tuples around. Promote them to relation
662 * locks. Predicate locks on indexes will be promoted when they are
663 * reindexed.
664 *
665 * During concurrent processing, the heap as well as its indexes stay in
666 * operation, so we postpone this step until they are locked using
667 * AccessExclusiveLock near the end of the processing.
668 */
669 if (!concurrent)
671
672 /*
673 * rebuild_relation does all the dirty work, and closes OldHeap and index,
674 * if valid.
675 *
676 * In concurrent mode, make sure the worker terminates; normally it does
677 * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
678 * happens even in case this backend dies early on a FATAL exit. Normal
679 * mode doesn't need that overhead.
680 */
681 if (concurrent)
682 {
684 {
686 }
689 }
690 else
692
693out:
694 /* Roll back any GUC changes executed by index functions */
695 AtEOXact_GUC(false, save_nestlevel);
696
697 /* Restore userid and security context */
698 SetUserIdAndSecContext(save_userid, save_sec_context);
699
701}
702
703/*
704 * Check if the table (and its index) still meets the requirements of
705 * cluster_rel().
706 */
707static bool
709 Oid userid, LOCKMODE lmode, int options)
710{
711 Oid tableOid = RelationGetRelid(OldHeap);
712
713 /* Check that the user still has privileges for the relation */
714 if (!repack_is_permitted_for_relation(cmd, tableOid, userid))
715 {
717 return false;
718 }
719
720 /*
721 * Silently skip a temp table for a remote session. Only doing this check
722 * in the "recheck" case is appropriate (which currently means somebody is
723 * executing a database-wide CLUSTER or on a partitioned table), because
724 * there is another check in cluster() which will stop any attempt to
725 * cluster remote temp tables by name. There is another check in
726 * cluster_rel which is redundant, but we leave it for extra safety.
727 */
729 {
731 return false;
732 }
733
734 if (OidIsValid(indexOid))
735 {
736 /*
737 * Check that the index still exists
738 */
740 {
742 return false;
743 }
744
745 /*
746 * Check that the index is still the one with indisclustered set, if
747 * needed.
748 */
749 if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
750 !get_index_isclustered(indexOid))
751 {
753 return false;
754 }
755 }
756
757 return true;
758}
759
760/*
761 * Verify that the specified heap and index are valid to cluster on
762 *
763 * Side effect: obtains lock on the index. The caller may
764 * in some cases already have a lock of the same strength on the table, but
765 * not in all cases so we can't rely on the table-level lock for
766 * protection here.
767 */
768void
770{
772
773 OldIndex = index_open(indexOid, lockmode);
774
775 /*
776 * Check that index is in fact an index on the given relation
777 */
778 if (OldIndex->rd_index == NULL ||
779 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
782 errmsg("\"%s\" is not an index for table \"%s\"",
785
786 /* Index AM must allow clustering */
787 if (!OldIndex->rd_indam->amclusterable)
790 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
792
793 /*
794 * Disallow clustering on incomplete indexes (those that might not index
795 * every row of the relation). We could relax this by making a separate
796 * seqscan pass over the table to copy the missing rows, but that seems
797 * expensive and tedious.
798 */
799 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
802 errmsg("cannot cluster on partial index \"%s\"",
804
805 /*
806 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
807 * it might well not contain entries for every heap row, or might not even
808 * be internally consistent. (But note that we don't check indcheckxmin;
809 * the worst consequence of following broken HOT chains would be that we
810 * might put recently-dead tuples out-of-order in the new table, and there
811 * is little harm in that.)
812 */
813 if (!OldIndex->rd_index->indisvalid)
816 errmsg("cannot cluster on invalid index \"%s\"",
818
819 /* Drop relcache refcnt on OldIndex, but keep lock */
821}
822
823/*
824 * mark_index_clustered: mark the specified index as the one clustered on
825 *
826 * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
827 */
828void
829mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
830{
835
836 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
837
838 /*
839 * If the index is already marked clustered, no need to do anything.
840 */
841 if (OidIsValid(indexOid))
842 {
843 if (get_index_isclustered(indexOid))
844 return;
845 }
846
847 /*
848 * Check each index of the relation and set/clear the bit as needed.
849 */
851
852 foreach(index, RelationGetIndexList(rel))
853 {
855
859 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
861
862 /*
863 * Unset the bit if set. We know it's wrong because we checked this
864 * earlier.
865 */
866 if (indexForm->indisclustered)
867 {
868 indexForm->indisclustered = false;
870 }
871 else if (thisIndexOid == indexOid)
872 {
873 /* this was checked earlier, but let's be real sure */
874 if (!indexForm->indisvalid)
875 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
876 indexForm->indisclustered = true;
878 }
879
881 InvalidOid, is_internal);
882
884 }
885
887}
888
889/*
890 * Check if the CONCURRENTLY option is legal for the relation.
891 *
892 * *Ident_idx_p receives OID of the identity index.
893 */
894static void
896{
897 char relpersistence,
898 replident;
900
904 errmsg("cannot execute %s in this configuration",
905 "REPACK (CONCURRENTLY)"),
906 errdetail("%s requires \"wal_level\" to be set to \"replica\" or higher.",
907 "REPACK (CONCURRENTLY)"));
908
909 /* Data changes in system relations are not logically decoded. */
910 if (IsCatalogRelation(rel))
913 errmsg("cannot execute %s on relation \"%s\"",
914 "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
915 errhint("%s is not supported for catalog relations.",
916 "REPACK (CONCURRENTLY)"));
917
918 /*
919 * reorderbuffer.c does not seem to handle processing of TOAST relation
920 * alone.
921 */
922 if (IsToastRelation(rel))
925 errmsg("cannot execute %s on relation \"%s\"",
926 "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
927 errhint("%s is not supported for TOAST relations.",
928 "REPACK (CONCURRENTLY)"));
929
930 relpersistence = rel->rd_rel->relpersistence;
931 if (relpersistence != RELPERSISTENCE_PERMANENT)
934 errmsg("cannot execute %s on relation \"%s\"",
935 "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
936 errhint("%s is only allowed for permanent relations.",
937 "REPACK (CONCURRENTLY)"));
938
939 /*
940 * With NOTHING, WAL does not contain the old tuple; FULL is not yet
941 * supported.
942 */
943 replident = rel->rd_rel->relreplident;
944 if (replident == REPLICA_IDENTITY_NOTHING ||
945 replident == REPLICA_IDENTITY_FULL)
948 errmsg("cannot execute %s on relation \"%s\"",
949 "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
950 errdetail("%s does not support tables with %s.",
951 "REPACK (CONCURRENTLY)",
952 replident == REPLICA_IDENTITY_NOTHING ?
953 "REPLICA IDENTITY NOTHING" : "REPLICA IDENTITY FULL"));
954
955 /*
956 * Obtain the replica identity index -- either one that has been set
957 * explicitly, or a non-deferrable primary key. If none of these cases
958 * apply, the table cannot be repacked concurrently. It might be possible
959 * to have repack work with a FULL replica identity; however that requires
960 * more work and is not implemented yet.
961 */
963 if (!OidIsValid(ident_idx))
964 {
965 /* This special case warrants its own error message */
966 if (OidIsValid(rel->rd_pkindex) && rel->rd_ispkdeferrable)
969 errmsg("cannot execute %s on relation \"%s\"",
970 "REPACK (CONCURRENTLY)",
972 errdetail("%s does not support deferrable primary keys.",
973 "REPACK (CONCURRENTLY)"),
974 errhint("Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity."));
975
978 errmsg("cannot execute %s on relation \"%s\"",
979 "REPACK (CONCURRENTLY)", RelationGetRelationName(rel)),
980 errhint("Relation \"%s\" has no identity index.",
982 }
983
985}
986
987
988/*
989 * rebuild_relation: rebuild an existing relation in index or physical order
990 *
991 * OldHeap: table to rebuild. See cluster_rel() for comments on the required
992 * lock strength.
993 *
994 * index: index to cluster by, or NULL to rewrite in physical order.
995 *
996 * ident_idx: identity index, to handle replaying of concurrent data changes
997 * to the new heap. InvalidOid if there's no CONCURRENTLY option.
998 *
999 * On entry, heap and index (if one is given) must be open, and the
1000 * appropriate lock held on them -- AccessExclusiveLock for exclusive
1001 * processing and ShareUpdateExclusiveLock for concurrent processing.
1002 *
1003 * On exit, they are closed, but still locked with AccessExclusiveLock.
1004 * (The function handles the lock upgrade if 'concurrent' is true.)
1005 */
1006static void
1008 Oid ident_idx)
1009{
1010 Oid tableOid = RelationGetRelid(OldHeap);
1011 Oid accessMethod = OldHeap->rd_rel->relam;
1012 Oid tableSpace = OldHeap->rd_rel->reltablespace;
1015 char relpersistence;
1019 bool concurrent = OidIsValid(ident_idx);
1020 Snapshot snapshot = NULL;
1021#if USE_ASSERT_CHECKING
1023
1024 lmode = RepackLockLevel(concurrent);
1025
1028#endif
1029
1030 if (concurrent)
1031 {
1032 /*
1033 * The worker needs to be member of the locking group we're the leader
1034 * of. We ought to become the leader before the worker starts. The
1035 * worker will join the group as soon as it starts.
1036 *
1037 * This is to make sure that the deadlock described below is
1038 * detectable by deadlock.c: if the worker waits for a transaction to
1039 * complete and we are waiting for the worker output, then effectively
1040 * we (i.e. this backend) are waiting for that transaction.
1041 */
1043
1044 /*
1045 * Start the worker that decodes data changes applied while we're
1046 * copying the table contents.
1047 *
1048 * Note that the worker has to wait for all transactions with XID
1049 * already assigned to finish. If some of those transactions is
1050 * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
1051 * table (e.g. it runs CREATE INDEX), we can end up in a deadlock.
1052 * Not sure this risk is worth unlocking/locking the table (and its
1053 * clustering index) and checking again if it's still eligible for
1054 * REPACK CONCURRENTLY.
1055 */
1057
1058 /*
1059 * Wait until the worker has the initial snapshot and retrieve it.
1060 */
1062
1063 PushActiveSnapshot(snapshot);
1064 }
1065
1066 /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */
1067 if (index != NULL)
1069
1070 /* Remember info about rel before closing OldHeap */
1071 relpersistence = OldHeap->rd_rel->relpersistence;
1072
1073 /*
1074 * Create the transient table that will receive the re-ordered data.
1075 *
1076 * OldHeap is already locked, so no need to lock it again. make_new_heap
1077 * obtains AccessExclusiveLock on the new heap and its toast table.
1078 */
1079 OIDNewHeap = make_new_heap(tableOid, tableSpace,
1080 accessMethod,
1081 relpersistence,
1082 NoLock);
1085
1086 /* Copy the heap data into the new table in the desired order */
1089
1090 /* The historic snapshot won't be needed anymore. */
1091 if (snapshot)
1092 {
1095 }
1096
1097 if (concurrent)
1098 {
1100
1101 /*
1102 * Close the index, but keep the lock. Both heaps will be closed by
1103 * the following call.
1104 */
1105 if (index)
1107
1110
1113 }
1114 else
1115 {
1117
1118 /* Close relcache entries, but keep lock until transaction commit */
1120 if (index)
1122
1123 /*
1124 * Close the new relation so it can be dropped as soon as the storage
1125 * is swapped. The relation is not visible to others, so no need to
1126 * unlock it explicitly.
1127 */
1129
1130 /*
1131 * Swap the physical files of the target and transient tables, then
1132 * rebuild the target's indexes and throw away the transient table.
1133 */
1135 swap_toast_by_content, false, true,
1136 true, /* reindex */
1138 relpersistence);
1139 }
1140}
1141
1142
1143/*
1144 * Create the transient table that will be filled with new data during
1145 * CLUSTER, ALTER TABLE, and similar operations. The transient table
1146 * duplicates the logical structure of the OldHeap; but will have the
1147 * specified physical storage properties NewTableSpace, NewAccessMethod, and
1148 * relpersistence.
1149 *
1150 * After this, the caller should load the new heap with transferred/modified
1151 * data, then call finish_heap_swap to complete the operation.
1152 */
1153Oid
1155 char relpersistence, LOCKMODE lockmode)
1156{
1160 Oid toastid;
1162 HeapTuple tuple;
1163 Datum reloptions;
1164 bool isNull;
1166
1167 OldHeap = table_open(OIDOldHeap, lockmode);
1169
1170 /*
1171 * Note that the NewHeap will not receive any of the defaults or
1172 * constraints associated with the OldHeap; we don't need 'em, and there's
1173 * no reason to spend cycles inserting them into the catalogs only to
1174 * delete them.
1175 */
1176
1177 /*
1178 * But we do want to use reloptions of the old heap for new heap.
1179 */
1181 if (!HeapTupleIsValid(tuple))
1182 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1183 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1184 &isNull);
1185 if (isNull)
1186 reloptions = (Datum) 0;
1187
1188 if (relpersistence == RELPERSISTENCE_TEMP)
1190 else
1192
1193 /*
1194 * Create the new heap, using a temporary name in the same namespace as
1195 * the existing table. NOTE: there is some risk of collision with user
1196 * relnames. Working around this seems more trouble than it's worth; in
1197 * particular, we can't create the new heap in a different namespace from
1198 * the old, or we will have problems with the TEMP status of temp tables.
1199 *
1200 * Note: the new heap is not a shared relation, even if we are rebuilding
1201 * a shared rel. However, we do make the new heap mapped if the source is
1202 * mapped. This simplifies swap_relation_files, and is absolutely
1203 * necessary for rebuilding pg_class, for reasons explained there.
1204 */
1205 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1206
1210 InvalidOid,
1211 InvalidOid,
1212 InvalidOid,
1213 OldHeap->rd_rel->relowner,
1216 NIL,
1218 relpersistence,
1219 false,
1222 reloptions,
1223 false,
1224 true,
1225 true,
1226 OIDOldHeap,
1227 NULL);
1229
1230 ReleaseSysCache(tuple);
1231
1232 /*
1233 * Advance command counter so that the newly-created relation's catalog
1234 * tuples will be visible to table_open.
1235 */
1237
1238 /*
1239 * If necessary, create a TOAST table for the new relation.
1240 *
1241 * If the relation doesn't have a TOAST table already, we can't need one
1242 * for the new relation. The other way around is possible though: if some
1243 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1244 * no TOAST table is needed for the new table.
1245 *
1246 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1247 * that the TOAST table will be visible for insertion.
1248 */
1249 toastid = OldHeap->rd_rel->reltoastrelid;
1250 if (OidIsValid(toastid))
1251 {
1252 /* keep the existing toast table's reloptions, if any */
1254 if (!HeapTupleIsValid(tuple))
1255 elog(ERROR, "cache lookup failed for relation %u", toastid);
1256 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1257 &isNull);
1258 if (isNull)
1259 reloptions = (Datum) 0;
1260
1261 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1262
1263 ReleaseSysCache(tuple);
1264 }
1265
1267
1268 return OIDNewHeap;
1269}
1270
1271/*
1272 * Do the physical copying of table data.
1273 *
1274 * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
1275 * iff concurrent processing is required.
1276 *
1277 * There are three output parameters:
1278 * *pSwapToastByContent is set true if toast tables must be swapped by content.
1279 * *pFreezeXid receives the TransactionId used as freeze cutoff point.
1280 * *pCutoffMulti receives the MultiXactId used as a cutoff point.
1281 */
1282static void
1284 Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
1286{
1292 VacuumParams params;
1293 struct VacuumCutoffs cutoffs;
1294 bool use_sort;
1295 double num_tuples = 0,
1296 tups_vacuumed = 0,
1298 BlockNumber num_pages;
1299 int elevel = verbose ? INFO : DEBUG2;
1300 PGRUsage ru0;
1301 char *nspname;
1302 bool concurrent = snapshot != NULL;
1304
1305 lmode = RepackLockLevel(concurrent);
1306
1308
1309 /* Store a copy of the namespace name for logging purposes */
1311
1312 /*
1313 * Their tuple descriptors should be exactly alike, but here we only need
1314 * assume that they have the same number of columns.
1315 */
1318 Assert(newTupDesc->natts == oldTupDesc->natts);
1319
1320 /*
1321 * If the OldHeap has a toast table, get lock on the toast table to keep
1322 * it from being vacuumed. This is needed because autovacuum processes
1323 * toast tables independently of their main tables, with no lock on the
1324 * latter. If an autovacuum were to start on the toast table after we
1325 * compute our OldestXmin below, it would use a later OldestXmin, and then
1326 * possibly remove as DEAD toast tuples belonging to main tuples we think
1327 * are only RECENTLY_DEAD. Then we'd fail while trying to copy those
1328 * tuples.
1329 *
1330 * We don't need to open the toast relation here, just lock it. The lock
1331 * will be held till end of transaction.
1332 */
1333 if (OldHeap->rd_rel->reltoastrelid)
1334 LockRelationOid(OldHeap->rd_rel->reltoastrelid, lmode);
1335
1336 /*
1337 * If both tables have TOAST tables, perform toast swap by content. It is
1338 * possible that the old table has a toast table but the new one doesn't,
1339 * if toastable columns have been dropped. In that case we have to do
1340 * swap by links. This is okay because swap by content is only essential
1341 * for system catalogs, and we don't support schema changes for them.
1342 */
1343 if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid &&
1344 !concurrent)
1345 {
1346 *pSwapToastByContent = true;
1347
1348 /*
1349 * When doing swap by content, any toast pointers written into NewHeap
1350 * must use the old toast table's OID, because that's where the toast
1351 * data will eventually be found. Set this up by setting rd_toastoid.
1352 * This also tells toast_save_datum() to preserve the toast value
1353 * OIDs, which we want so as not to invalidate toast pointers in
1354 * system catalog caches, and to avoid making multiple copies of a
1355 * single toast value.
1356 *
1357 * Note that we must hold NewHeap open until we are done writing data,
1358 * since the relcache will not guarantee to remember this setting once
1359 * the relation is closed. Also, this technique depends on the fact
1360 * that no one will try to read from the NewHeap until after we've
1361 * finished writing it and swapping the rels --- otherwise they could
1362 * follow the toast pointers to the wrong place. (It would actually
1363 * work for values copied over from the old toast table, but not for
1364 * any values that we toast which were previously not toasted.)
1365 *
1366 * This would not work with CONCURRENTLY because we may need to delete
1367 * TOASTed tuples from the new heap. With this hack, we'd delete them
1368 * from the old heap.
1369 */
1370 NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
1371 }
1372 else
1373 *pSwapToastByContent = false;
1374
1375 /*
1376 * Compute xids used to freeze and weed out dead tuples and multixacts.
1377 * Since we're going to rewrite the whole table anyway, there's no reason
1378 * not to be aggressive about this.
1379 */
1380 memset(&params, 0, sizeof(VacuumParams));
1381 vacuum_get_cutoffs(OldHeap, &params, &cutoffs);
1382
1383 /*
1384 * FreezeXid will become the table's new relfrozenxid, and that mustn't go
1385 * backwards, so take the max.
1386 */
1387 {
1388 TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid;
1389
1392 cutoffs.FreezeLimit = relfrozenxid;
1393 }
1394
1395 /*
1396 * MultiXactCutoff, similarly, shouldn't go backwards either.
1397 */
1398 {
1399 MultiXactId relminmxid = OldHeap->rd_rel->relminmxid;
1400
1403 cutoffs.MultiXactCutoff = relminmxid;
1404 }
1405
1406 /*
1407 * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
1408 * the OldHeap. We know how to use a sort to duplicate the ordering of a
1409 * btree index, and will use seqscan-and-sort for that case if the planner
1410 * tells us it's cheaper. Otherwise, always indexscan if an index is
1411 * provided, else plain seqscan.
1412 */
1413 if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
1416 else
1417 use_sort = false;
1418
1419 /* Log what we're doing */
1420 if (OldIndex != NULL && !use_sort)
1421 ereport(elevel,
1422 errmsg("repacking \"%s.%s\" using index scan on \"%s\"",
1423 nspname,
1426 else if (use_sort)
1427 ereport(elevel,
1428 errmsg("repacking \"%s.%s\" using sequential scan and sort",
1429 nspname,
1431 else
1432 ereport(elevel,
1433 errmsg("repacking \"%s.%s\" in physical order",
1434 nspname,
1436
1437 /*
1438 * Hand off the actual copying to AM specific function, the generic code
1439 * cannot know how to deal with visibility across AMs. Note that this
1440 * routine is allowed to set FreezeXid / MultiXactCutoff to different
1441 * values (e.g. because the AM doesn't use freezing).
1442 */
1444 cutoffs.OldestXmin, snapshot,
1445 &cutoffs.FreezeLimit,
1446 &cutoffs.MultiXactCutoff,
1447 &num_tuples, &tups_vacuumed,
1449
1450 /* return selected values to caller, get set as relfrozenxid/minmxid */
1451 *pFreezeXid = cutoffs.FreezeLimit;
1452 *pCutoffMulti = cutoffs.MultiXactCutoff;
1453
1454 /*
1455 * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again.
1456 * In the CONCURRENTLY case, we need to set it again before applying the
1457 * concurrent changes.
1458 */
1459 NewHeap->rd_toastoid = InvalidOid;
1460
1462
1463 /* Log what we did */
1464 ereport(elevel,
1465 (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1466 nspname,
1468 tups_vacuumed, num_tuples,
1470 errdetail("%.0f dead row versions cannot be removed yet.\n"
1471 "%s.",
1473 pg_rusage_show(&ru0))));
1474
1475 /* Update pg_class to reflect the correct values of pages and tuples. */
1477
1481 elog(ERROR, "cache lookup failed for relation %u",
1484
1485 relform->relpages = num_pages;
1486 relform->reltuples = num_tuples;
1487
1488 /* Don't update the stats for pg_class. See swap_relation_files. */
1491 else
1493
1494 /* Clean up. */
1497
1498 /* Make the update visible */
1500}
1501
1502/*
1503 * Swap the physical files of two given relations.
1504 *
1505 * We swap the physical identity (reltablespace, relfilenumber) while keeping
1506 * the same logical identities of the two relations. relpersistence is also
1507 * swapped, which is critical since it determines where buffers live for each
1508 * relation.
1509 *
1510 * We can swap associated TOAST data in either of two ways: recursively swap
1511 * the physical content of the toast tables (and their indexes), or swap the
1512 * TOAST links in the given relations' pg_class entries. The former is needed
1513 * to manage rewrites of shared catalogs (where we cannot change the pg_class
1514 * links) while the latter is the only way to handle cases in which a toast
1515 * table is added or removed altogether.
1516 *
1517 * Additionally, the first relation is marked with relfrozenxid set to
1518 * frozenXid. It seems a bit ugly to have this here, but the caller would
1519 * have to do it anyway, so having it here saves a heap_update. Note: in
1520 * the swap-toast-links case, we assume we don't need to change the toast
1521 * table's relfrozenxid: the new version of the toast table should already
1522 * have relfrozenxid set to RecentXmin, which is good enough.
1523 *
1524 * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1525 * their OIDs are emitted into mapped_tables[]. This is hacky but beats
1526 * having to look the information up again later in finish_heap_swap.
1527 */
1528static void
1531 bool is_internal,
1535{
1538 reltup2;
1540 relform2;
1544 char swptmpchr;
1545 Oid relam1,
1546 relam2;
1547
1548 /* We need writable copies of both pg_class tuples. */
1550
1553 elog(ERROR, "cache lookup failed for relation %u", r1);
1555
1558 elog(ERROR, "cache lookup failed for relation %u", r2);
1560
1561 relfilenumber1 = relform1->relfilenode;
1562 relfilenumber2 = relform2->relfilenode;
1563 relam1 = relform1->relam;
1564 relam2 = relform2->relam;
1565
1568 {
1569 /*
1570 * Normal non-mapped relations: swap relfilenumbers, reltablespaces,
1571 * relpersistence
1572 */
1574
1575 swaptemp = relform1->relfilenode;
1576 relform1->relfilenode = relform2->relfilenode;
1577 relform2->relfilenode = swaptemp;
1578
1579 swaptemp = relform1->reltablespace;
1580 relform1->reltablespace = relform2->reltablespace;
1581 relform2->reltablespace = swaptemp;
1582
1583 swaptemp = relform1->relam;
1584 relform1->relam = relform2->relam;
1585 relform2->relam = swaptemp;
1586
1587 swptmpchr = relform1->relpersistence;
1588 relform1->relpersistence = relform2->relpersistence;
1589 relform2->relpersistence = swptmpchr;
1590
1591 /* Also swap toast links, if we're swapping by links */
1593 {
1594 swaptemp = relform1->reltoastrelid;
1595 relform1->reltoastrelid = relform2->reltoastrelid;
1596 relform2->reltoastrelid = swaptemp;
1597 }
1598 }
1599 else
1600 {
1601 /*
1602 * Mapped-relation case. Here we have to swap the relation mappings
1603 * instead of modifying the pg_class columns. Both must be mapped.
1604 */
1607 elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1608 NameStr(relform1->relname));
1609
1610 /*
1611 * We can't change the tablespace nor persistence of a mapped rel, and
1612 * we can't handle toast link swapping for one either, because we must
1613 * not apply any critical changes to its pg_class row. These cases
1614 * should be prevented by upstream permissions tests, so these checks
1615 * are non-user-facing emergency backstop.
1616 */
1617 if (relform1->reltablespace != relform2->reltablespace)
1618 elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1619 NameStr(relform1->relname));
1620 if (relform1->relpersistence != relform2->relpersistence)
1621 elog(ERROR, "cannot change persistence of mapped relation \"%s\"",
1622 NameStr(relform1->relname));
1623 if (relform1->relam != relform2->relam)
1624 elog(ERROR, "cannot change access method of mapped relation \"%s\"",
1625 NameStr(relform1->relname));
1626 if (!swap_toast_by_content &&
1627 (relform1->reltoastrelid || relform2->reltoastrelid))
1628 elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1629 NameStr(relform1->relname));
1630
1631 /*
1632 * Fetch the mappings --- shouldn't fail, but be paranoid
1633 */
1636 elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1637 NameStr(relform1->relname), r1);
1640 elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1641 NameStr(relform2->relname), r2);
1642
1643 /*
1644 * Send replacement mappings to relmapper. Note these won't actually
1645 * take effect until CommandCounterIncrement.
1646 */
1647 RelationMapUpdateMap(r1, relfilenumber2, relform1->relisshared, false);
1648 RelationMapUpdateMap(r2, relfilenumber1, relform2->relisshared, false);
1649
1650 /* Pass OIDs of mapped r2 tables back to caller */
1651 *mapped_tables++ = r2;
1652 }
1653
1654 /*
1655 * Recognize that rel1's relfilenumber (swapped from rel2) is new in this
1656 * subtransaction. The rel2 storage (swapped from rel1) may or may not be
1657 * new.
1658 */
1659 {
1660 Relation rel1,
1661 rel2;
1662
1665 rel2->rd_createSubid = rel1->rd_createSubid;
1666 rel2->rd_newRelfilelocatorSubid = rel1->rd_newRelfilelocatorSubid;
1667 rel2->rd_firstRelfilelocatorSubid = rel1->rd_firstRelfilelocatorSubid;
1671 }
1672
1673 /*
1674 * In the case of a shared catalog, these next few steps will only affect
1675 * our own database's pg_class row; but that's okay, because they are all
1676 * noncritical updates. That's also an important fact for the case of a
1677 * mapped catalog, because it's possible that we'll commit the map change
1678 * and then fail to commit the pg_class update.
1679 */
1680
1681 /* set rel1's frozen Xid and minimum MultiXid */
1682 if (relform1->relkind != RELKIND_INDEX)
1683 {
1686 relform1->relfrozenxid = frozenXid;
1687 relform1->relminmxid = cutoffMulti;
1688 }
1689
1690 /* swap size statistics too, since new rel has freshly-updated stats */
1691 {
1696
1697 swap_pages = relform1->relpages;
1698 relform1->relpages = relform2->relpages;
1699 relform2->relpages = swap_pages;
1700
1701 swap_tuples = relform1->reltuples;
1702 relform1->reltuples = relform2->reltuples;
1703 relform2->reltuples = swap_tuples;
1704
1705 swap_allvisible = relform1->relallvisible;
1706 relform1->relallvisible = relform2->relallvisible;
1707 relform2->relallvisible = swap_allvisible;
1708
1709 swap_allfrozen = relform1->relallfrozen;
1710 relform1->relallfrozen = relform2->relallfrozen;
1711 relform2->relallfrozen = swap_allfrozen;
1712 }
1713
1714 /*
1715 * Update the tuples in pg_class --- unless the target relation of the
1716 * swap is pg_class itself. In that case, there is zero point in making
1717 * changes because we'd be updating the old data that we're about to throw
1718 * away. Because the real work being done here for a mapped relation is
1719 * just to change the relation map settings, it's all right to not update
1720 * the pg_class rows in this case. The most important changes will instead
1721 * performed later, in finish_heap_swap() itself.
1722 */
1723 if (!target_is_pg_class)
1724 {
1726
1729 indstate);
1731 indstate);
1733 }
1734 else
1735 {
1736 /* no update ... but we do still need relcache inval */
1739 }
1740
1741 /*
1742 * Now that pg_class has been updated with its relevant information for
1743 * the swap, update the dependency of the relations to point to their new
1744 * table AM, if it has changed.
1745 */
1746 if (relam1 != relam2)
1747 {
1749 r1,
1751 relam1,
1752 relam2) != 1)
1753 elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1755 get_rel_name(r1));
1757 r2,
1759 relam2,
1760 relam1) != 1)
1761 elog(ERROR, "could not change access method dependency for relation \"%s.%s\"",
1763 get_rel_name(r2));
1764 }
1765
1766 /*
1767 * Post alter hook for modified relations. The change to r2 is always
1768 * internal, but r1 depends on the invocation context.
1769 */
1771 InvalidOid, is_internal);
1773 InvalidOid, true);
1774
1775 /*
1776 * If we have toast tables associated with the relations being swapped,
1777 * deal with them too.
1778 */
1779 if (relform1->reltoastrelid || relform2->reltoastrelid)
1780 {
1782 {
1783 if (relform1->reltoastrelid && relform2->reltoastrelid)
1784 {
1785 /* Recursively swap the contents of the toast tables */
1786 swap_relation_files(relform1->reltoastrelid,
1787 relform2->reltoastrelid,
1790 is_internal,
1791 frozenXid,
1794 }
1795 else
1796 {
1797 /* caller messed up */
1798 elog(ERROR, "cannot swap toast files by content when there's only one");
1799 }
1800 }
1801 else
1802 {
1803 /*
1804 * We swapped the ownership links, so we need to change dependency
1805 * data to match.
1806 *
1807 * NOTE: it is possible that only one table has a toast table.
1808 *
1809 * NOTE: at present, a TOAST table's only dependency is the one on
1810 * its owning table. If more are ever created, we'd need to use
1811 * something more selective than deleteDependencyRecordsFor() to
1812 * get rid of just the link we want.
1813 */
1816 long count;
1817
1818 /*
1819 * We disallow this case for system catalogs, to avoid the
1820 * possibility that the catalog we're rebuilding is one of the
1821 * ones the dependency changes would change. It's too late to be
1822 * making any data changes to the target catalog.
1823 */
1825 elog(ERROR, "cannot swap toast files by links for system catalogs");
1826
1827 /* Delete old dependencies */
1828 if (relform1->reltoastrelid)
1829 {
1831 relform1->reltoastrelid,
1832 false);
1833 if (count != 1)
1834 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1835 count);
1836 }
1837 if (relform2->reltoastrelid)
1838 {
1840 relform2->reltoastrelid,
1841 false);
1842 if (count != 1)
1843 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1844 count);
1845 }
1846
1847 /* Register new dependencies */
1849 baseobject.objectSubId = 0;
1851 toastobject.objectSubId = 0;
1852
1853 if (relform1->reltoastrelid)
1854 {
1855 baseobject.objectId = r1;
1856 toastobject.objectId = relform1->reltoastrelid;
1859 }
1860
1861 if (relform2->reltoastrelid)
1862 {
1863 baseobject.objectId = r2;
1864 toastobject.objectId = relform2->reltoastrelid;
1867 }
1868 }
1869 }
1870
1871 /*
1872 * If we're swapping two toast tables by content, do the same for their
1873 * valid index. The swap can actually be safely done only if the relations
1874 * have indexes.
1875 */
1877 relform1->relkind == RELKIND_TOASTVALUE &&
1878 relform2->relkind == RELKIND_TOASTVALUE)
1879 {
1882
1883 /* Get valid index for each relation */
1888
1893 is_internal,
1897 }
1898
1899 /* Clean up. */
1902
1904}
1905
1906/*
1907 * Remove the transient table that was built by make_new_heap, and finish
1908 * cleaning up (including rebuilding all indexes on the old heap).
1909 */
1910void
1912 bool is_system_catalog,
1914 bool check_constraints,
1915 bool is_internal,
1916 bool reindex,
1919 char newrelpersistence)
1920{
1921 ObjectAddress object;
1922 Oid mapped_tables[4];
1923 int i;
1924
1925 /* Report that we are now swapping relation files */
1928
1929 /* Zero out possible results from swapped_relation_files */
1930 memset(mapped_tables, 0, sizeof(mapped_tables));
1931
1932 /*
1933 * Swap the contents of the heap relations (including any toast tables).
1934 * Also set old heap's relfrozenxid to frozenXid.
1935 */
1938 swap_toast_by_content, is_internal,
1940
1941 /*
1942 * If it's a system catalog, queue a sinval message to flush all catcaches
1943 * on the catalog when we reach CommandCounterIncrement.
1944 */
1947
1948 if (reindex)
1949 {
1950 int reindex_flags;
1952
1953 /*
1954 * Rebuild each index on the relation (but not the toast table, which
1955 * is all-new at this point). It is important to do this before the
1956 * DROP step because if we are processing a system catalog that will
1957 * be used during DROP, we want to have its indexes available. There
1958 * is no advantage to the other order anyway because this is all
1959 * transactional, so no chance to reclaim disk space before commit. We
1960 * do not need a final CommandCounterIncrement() because
1961 * reindex_relation does it.
1962 *
1963 * Note: because index_build is called via reindex_relation, it will
1964 * never set indcheckxmin true for the indexes. This is OK even
1965 * though in some sense we are building new indexes rather than
1966 * rebuilding existing ones, because the new heap won't contain any
1967 * HOT chains at all, let alone broken ones, so it can't be necessary
1968 * to set indcheckxmin.
1969 */
1973
1974 /*
1975 * Ensure that the indexes have the same persistence as the parent
1976 * relation.
1977 */
1978 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1980 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1982
1983 /* Report that we are now reindexing relations */
1986
1988 }
1989
1990 /* Report that we are now doing clean up */
1993
1994 /*
1995 * If the relation being rebuilt is pg_class, swap_relation_files()
1996 * couldn't update pg_class's own pg_class entry (check comments in
1997 * swap_relation_files()), thus relfrozenxid was not updated. That's
1998 * annoying because a potential reason for doing a VACUUM FULL is a
1999 * imminent or actual anti-wraparound shutdown. So, now that we can
2000 * access the new relation using its indices, update relfrozenxid.
2001 * pg_class doesn't have a toast relation, so we don't need to update the
2002 * corresponding toast relation. Not that there's little point moving all
2003 * relfrozenxid updates here since swap_relation_files() needs to write to
2004 * pg_class for non-mapped relations anyway.
2005 */
2007 {
2011
2013
2016 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
2018
2019 relform->relfrozenxid = frozenXid;
2020 relform->relminmxid = cutoffMulti;
2021
2023
2025 }
2026
2027 /* Destroy new heap with old filenumber */
2028 object.classId = RelationRelationId;
2029 object.objectId = OIDNewHeap;
2030 object.objectSubId = 0;
2031
2032 if (!reindex)
2033 {
2034 /*
2035 * Make sure the changes in pg_class are visible. This is especially
2036 * important if !swap_toast_by_content, so that the correct TOAST
2037 * relation is dropped. (reindex_relation() above did not help in this
2038 * case))
2039 */
2041 }
2042
2043 /*
2044 * The new relation is local to our transaction and we know nothing
2045 * depends on it, so DROP_RESTRICT should be OK.
2046 */
2048
2049 /* performDeletion does CommandCounterIncrement at end */
2050
2051 /*
2052 * Now we must remove any relation mapping entries that we set up for the
2053 * transient table, as well as its toast table and toast index if any. If
2054 * we fail to do this before commit, the relmapper will complain about new
2055 * permanent map entries being added post-bootstrap.
2056 */
2057 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2059
2060 /*
2061 * At this point, everything is kosher except that, if we did toast swap
2062 * by links, the toast table's name corresponds to the transient table.
2063 * The name is irrelevant to the backend because it's referenced by OID,
2064 * but users looking at the catalogs could be confused. Rename it to
2065 * prevent this problem.
2066 *
2067 * Note no lock required on the relation, because we already hold an
2068 * exclusive lock on it.
2069 */
2071 {
2073
2075 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2076 {
2077 Oid toastidx;
2079
2080 /* Get the associated valid index to be renamed */
2081 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2083
2084 /* rename the toast table ... */
2085 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2086 OIDOldHeap);
2087 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2088 NewToastName, true, false);
2089
2090 /* ... and its valid index too. */
2091 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2092 OIDOldHeap);
2093
2095 NewToastName, true, true);
2096
2097 /*
2098 * Reset the relrewrite for the toast. The command-counter
2099 * increment is required here as we are about to update the tuple
2100 * that is updated as part of RenameRelationInternal.
2101 */
2103 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2104 }
2106 }
2107
2108 /* if it's not a catalog table, clear any missing attribute settings */
2109 if (!is_system_catalog)
2110 {
2112
2116 }
2117}
2118
2119/*
2120 * Determine which relations to process, when REPACK/CLUSTER is called
2121 * without specifying a table name. The exact process depends on whether
2122 * USING INDEX was given or not, and in any case we only return tables and
2123 * materialized views that the current user has privileges to repack/cluster.
2124 *
2125 * If USING INDEX was given, we scan pg_index to find those that have
2126 * indisclustered set; if it was not given, scan pg_class and return all
2127 * tables.
2128 *
2129 * Return it as a list of RelToCluster in the given memory context.
2130 */
2131static List *
2133{
2135 TableScanDesc scan;
2136 HeapTuple tuple;
2137 List *rtcs = NIL;
2138
2139 if (usingindex)
2140 {
2141 ScanKeyData entry;
2142
2143 /*
2144 * For USING INDEX, scan pg_index to find those with indisclustered.
2145 */
2147 ScanKeyInit(&entry,
2150 BoolGetDatum(true));
2151 scan = table_beginscan_catalog(catalog, 1, &entry);
2152 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2153 {
2159
2160 index = (Form_pg_index) GETSTRUCT(tuple);
2161
2162 /*
2163 * Try to obtain a light lock on the index's table, to ensure it
2164 * doesn't go away while we collect the list. If we cannot, just
2165 * disregard it. Be sure to release this if we ultimately decide
2166 * not to process the table!
2167 */
2169 continue;
2170
2171 /* Verify that the table still exists; skip if not */
2174 {
2176 continue;
2177 }
2179
2180 /* Skip temp relations belonging to other sessions */
2181 if (classForm->relpersistence == RELPERSISTENCE_TEMP &&
2182 !isTempOrTempToastNamespace(classForm->relnamespace))
2183 {
2186 continue;
2187 }
2188
2190
2191 /* noisily skip rels which the user can't process */
2192 if (!repack_is_permitted_for_relation(cmd, index->indrelid,
2193 GetUserId()))
2194 {
2196 continue;
2197 }
2198
2199 /* Use a permanent memory context for the result list */
2202 rtc->tableOid = index->indrelid;
2203 rtc->indexOid = index->indexrelid;
2204 rtcs = lappend(rtcs, rtc);
2206 }
2207 }
2208 else
2209 {
2212
2213 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2214 {
2216 Form_pg_class class;
2218
2219 class = (Form_pg_class) GETSTRUCT(tuple);
2220
2221 /*
2222 * Try to obtain a light lock on the table, to ensure it doesn't
2223 * go away while we collect the list. If we cannot, just
2224 * disregard the table. Be sure to release this if we ultimately
2225 * decide not to process the table!
2226 */
2228 continue;
2229
2230 /* Verify that the table still exists */
2232 {
2234 continue;
2235 }
2236
2237 /* Can only process plain tables and matviews */
2238 if (class->relkind != RELKIND_RELATION &&
2239 class->relkind != RELKIND_MATVIEW)
2240 {
2242 continue;
2243 }
2244
2245 /* Skip temp relations belonging to other sessions */
2246 if (class->relpersistence == RELPERSISTENCE_TEMP &&
2247 !isTempOrTempToastNamespace(class->relnamespace))
2248 {
2250 continue;
2251 }
2252
2253 /* noisily skip rels which the user can't process */
2255 GetUserId()))
2256 {
2258 continue;
2259 }
2260
2261 /* Use a permanent memory context for the result list */
2264 rtc->tableOid = class->oid;
2265 rtc->indexOid = InvalidOid;
2266 rtcs = lappend(rtcs, rtc);
2268 }
2269 }
2270
2271 table_endscan(scan);
2273
2274 return rtcs;
2275}
2276
2277/*
2278 * Given a partitioned table or its index, return a list of RelToCluster for
2279 * all the leaf child tables/indexes.
2280 *
2281 * 'rel_is_index' tells whether 'relid' is that of an index (true) or of the
2282 * owning relation.
2283 */
2284static List *
2287{
2288 List *inhoids;
2289 List *rtcs = NIL;
2290
2291 /*
2292 * Do not lock the children until they're processed. Note that we do hold
2293 * a lock on the parent partitioned table.
2294 */
2297 {
2298 Oid table_oid,
2299 index_oid;
2302
2303 if (rel_is_index)
2304 {
2305 /* consider only leaf indexes */
2307 continue;
2308
2311 }
2312 else
2313 {
2314 /* consider only leaf relations */
2316 continue;
2317
2320 }
2321
2322 /*
2323 * It's possible that the user does not have privileges to CLUSTER the
2324 * leaf partition despite having them on the partitioned table. Skip
2325 * if so.
2326 */
2328 continue;
2329
2330 /* Use a permanent memory context for the result list */
2333 rtc->tableOid = table_oid;
2334 rtc->indexOid = index_oid;
2335 rtcs = lappend(rtcs, rtc);
2337 }
2338
2339 return rtcs;
2340}
2341
2342
2343/*
2344 * Return whether userid has privileges to REPACK relid. If not, this
2345 * function emits a WARNING.
2346 */
2347static bool
2349{
2351
2352 if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK)
2353 return true;
2354
2356 errmsg("permission denied to execute %s on \"%s\", skipping it",
2358 get_rel_name(relid)));
2359
2360 return false;
2361}
2362
2363
2364/*
2365 * Given a RepackStmt with an indicated relation name, resolve the relation
2366 * name, obtain lock on it, then determine what to do based on the relation
2367 * type: if it's table and not partitioned, repack it as indicated (using an
2368 * existing clustered index, or following the given one), and return NULL.
2369 *
2370 * On the other hand, if the table is partitioned, do nothing further and
2371 * instead return the opened and locked relcache entry, so that caller can
2372 * process the partitions using the multiple-table handling code. In this
2373 * case, if an index name is given, it's up to the caller to resolve it.
2374 */
2375static Relation
2377 ClusterParams *params)
2378{
2379 Relation rel;
2380 Oid tableOid;
2381
2382 Assert(stmt->relation != NULL);
2383 Assert(stmt->command == REPACK_COMMAND_CLUSTER ||
2384 stmt->command == REPACK_COMMAND_REPACK);
2385
2386 /*
2387 * Make sure ANALYZE is specified if a column list is present.
2388 */
2389 if ((params->options & CLUOPT_ANALYZE) == 0 && stmt->relation->va_cols != NIL)
2390 ereport(ERROR,
2392 errmsg("ANALYZE option must be specified when a column list is provided"));
2393
2394 /* Find, lock, and check permissions on the table. */
2395 tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
2396 lockmode,
2397 0,
2399 NULL);
2400 rel = table_open(tableOid, NoLock);
2401
2402 /*
2403 * Reject clustering a remote temp table ... their local buffer manager is
2404 * not going to cope.
2405 */
2406 if (RELATION_IS_OTHER_TEMP(rel))
2407 ereport(ERROR,
2409 /*- translator: first %s is name of a SQL command, eg. REPACK */
2410 errmsg("cannot execute %s on temporary tables of other sessions",
2411 RepackCommandAsString(stmt->command)));
2412
2413 /*
2414 * For partitioned tables, let caller handle this. Otherwise, process it
2415 * here and we're done.
2416 */
2417 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2418 return rel;
2419 else
2420 {
2421 Oid indexOid = InvalidOid;
2422
2423 indexOid = determine_clustered_index(rel, stmt->usingindex,
2424 stmt->indexname);
2425 if (OidIsValid(indexOid))
2426 check_index_is_clusterable(rel, indexOid, lockmode);
2427
2428 cluster_rel(stmt->command, rel, indexOid, params, isTopLevel);
2429
2430 /*
2431 * Do an analyze, if requested. We close the transaction and start a
2432 * new one, so that we don't hold the stronger lock for longer than
2433 * needed.
2434 */
2435 if (params->options & CLUOPT_ANALYZE)
2436 {
2438
2441
2444
2445 vac_params.options |= VACOPT_ANALYZE;
2446 if (params->options & CLUOPT_VERBOSE)
2447 vac_params.options |= VACOPT_VERBOSE;
2448 analyze_rel(tableOid, NULL, &vac_params,
2449 stmt->relation->va_cols, true, NULL);
2452 }
2453
2454 return NULL;
2455 }
2456}
2457
2458/*
2459 * Given a relation and the usingindex/indexname options in a
2460 * REPACK USING INDEX or CLUSTER command, return the OID of the
2461 * index to use for clustering the table.
2462 *
2463 * Caller must hold lock on the relation so that the set of indexes
2464 * doesn't change, and must call check_index_is_clusterable.
2465 */
2466static Oid
2467determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
2468{
2469 Oid indexOid;
2470
2471 if (indexname == NULL && usingindex)
2472 {
2473 /*
2474 * If USING INDEX with no name is given, find a clustered index, or
2475 * error out if none.
2476 */
2477 indexOid = InvalidOid;
2479 {
2481 {
2482 indexOid = idxoid;
2483 break;
2484 }
2485 }
2486
2487 if (!OidIsValid(indexOid))
2488 ereport(ERROR,
2490 errmsg("there is no previously clustered index for table \"%s\"",
2492 }
2493 else if (indexname != NULL)
2494 {
2495 /* An index was specified; obtain its OID. */
2496 indexOid = get_relname_relid(indexname, rel->rd_rel->relnamespace);
2497 if (!OidIsValid(indexOid))
2498 ereport(ERROR,
2500 errmsg("index \"%s\" for table \"%s\" does not exist",
2501 indexname, RelationGetRelationName(rel)));
2502 }
2503 else
2504 indexOid = InvalidOid;
2505
2506 return indexOid;
2507}
2508
2509static const char *
2511{
2512 switch (cmd)
2513 {
2515 return "REPACK";
2517 return "VACUUM";
2519 return "CLUSTER";
2520 }
2521 return "???"; /* keep compiler quiet */
2522}
2523
2524/*
2525 * Apply all the changes stored in 'file'.
2526 */
2527static void
2529{
2530 ConcurrentChangeKind kind = '\0';
2531 Relation rel = chgcxt->cc_rel;
2535 bool have_old_tuple = false;
2537
2539 &TTSOpsVirtual);
2543 &TTSOpsVirtual);
2544
2546
2547 while (true)
2548 {
2549 size_t nread;
2551
2553
2554 nread = BufFileReadMaybeEOF(file, &kind, 1, true);
2555 if (nread == 0) /* done with the file? */
2556 break;
2557
2558 /*
2559 * If this is the old tuple for an update, read it into the tuple slot
2560 * and go to the next one. The update itself will be executed on the
2561 * next iteration, when we receive the NEW tuple.
2562 */
2563 if (kind == CHANGE_UPDATE_OLD)
2564 {
2565 restore_tuple(file, rel, old_update_tuple);
2566 have_old_tuple = true;
2567 continue;
2568 }
2569
2570 /*
2571 * Just before an UPDATE or DELETE, we must update the command
2572 * counter, because the change could refer to a tuple that we have
2573 * just inserted; and before an INSERT, we have to do this also if the
2574 * previous command was either update or delete.
2575 *
2576 * With this approach we don't spend so many CCIs for long strings of
2577 * only INSERTs, which can't affect one another.
2578 */
2579 if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE ||
2580 (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW ||
2582 {
2585 }
2586
2587 /*
2588 * Now restore the tuple into the slot and execute the change.
2589 */
2590 restore_tuple(file, rel, spilled_tuple);
2591
2592 if (kind == CHANGE_INSERT)
2593 {
2595 }
2596 else if (kind == CHANGE_DELETE)
2597 {
2598 bool found;
2599
2600 /* Find the tuple to be deleted */
2602 if (!found)
2603 elog(ERROR, "could not find target tuple");
2605 }
2606 else if (kind == CHANGE_UPDATE_NEW)
2607 {
2608 TupleTableSlot *key;
2609 bool found;
2610
2611 if (have_old_tuple)
2612 key = old_update_tuple;
2613 else
2614 key = spilled_tuple;
2615
2616 /* Find the tuple to be updated or deleted. */
2617 found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
2618 if (!found)
2619 elog(ERROR, "could not find target tuple");
2620
2621 /*
2622 * If 'tup' contains TOAST pointers, they point to the old
2623 * relation's toast. Copy the corresponding TOAST pointers for the
2624 * new relation from the existing tuple. (The fact that we
2625 * received a TOAST pointer here implies that the attribute hasn't
2626 * changed.)
2627 */
2629
2631
2633 have_old_tuple = false;
2634 }
2635 else
2636 elog(ERROR, "unrecognized kind of change: %d", kind);
2637
2638 ResetPerTupleExprContext(chgcxt->cc_estate);
2639 }
2640
2641 /* Cleanup. */
2645
2647}
2648
2649/*
2650 * Apply an insert from the spill of concurrent changes to the new copy of the
2651 * table.
2652 */
2653static void
2656{
2657 /* Put the tuple in the table, but make sure it won't be decoded */
2658 table_tuple_insert(rel, slot, GetCurrentCommandId(true),
2660
2661 /* Update indexes with this new tuple. */
2663 chgcxt->cc_estate,
2664 0,
2665 slot,
2666 NIL, NULL);
2668}
2669
2670/*
2671 * Apply an update from the spill of concurrent changes to the new copy of the
2672 * table.
2673 */
2674static void
2678{
2679 LockTupleMode lockmode;
2680 TM_FailureData tmfd;
2682 TM_Result res;
2683
2684 /*
2685 * Carry out the update, skipping logical decoding for it.
2686 */
2687 res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
2688 GetCurrentCommandId(true),
2692 false,
2693 &tmfd, &lockmode, &update_indexes);
2694 if (res != TM_Ok)
2695 ereport(ERROR,
2697 errmsg("could not apply concurrent %s on relation \"%s\"",
2698 "UPDATE", RelationGetRelationName(rel)));
2699
2700 if (update_indexes != TU_None)
2701 {
2702 uint32 flags = EIIT_IS_UPDATE;
2703
2705 flags |= EIIT_ONLY_SUMMARIZING;
2707 chgcxt->cc_estate,
2708 flags,
2710 NIL, NULL);
2711 }
2712
2714}
2715
2716static void
2718{
2719 TM_Result res;
2720 TM_FailureData tmfd;
2721
2722 /*
2723 * Delete tuple from the new heap, skipping logical decoding for it.
2724 */
2725 res = table_tuple_delete(rel, &(slot->tts_tid),
2726 GetCurrentCommandId(true),
2729 false,
2730 &tmfd);
2731
2732 if (res != TM_Ok)
2733 ereport(ERROR,
2735 errmsg("could not apply concurrent %s on relation \"%s\"",
2736 "DELETE", RelationGetRelationName(rel)));
2737
2739}
2740
2741/*
2742 * Read tuple from file and put it in the input slot. All memory is allocated
2743 * in the current memory context; caller is responsible for freeing it as
2744 * appropriate.
2745 *
2746 * External attributes are stored in separate memory chunks, in order to avoid
2747 * exceeding MaxAllocSize - that could happen if the individual attributes are
2748 * smaller than MaxAllocSize but the whole tuple is bigger.
2749 */
2750static void
2752{
2753 uint32 t_len;
2754 HeapTuple tup;
2755 int natt_ext;
2756
2757 /* Read the tuple. */
2758 BufFileReadExact(file, &t_len, sizeof(t_len));
2759 tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
2760 tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
2761 BufFileReadExact(file, tup->t_data, t_len);
2762 tup->t_len = t_len;
2763 ItemPointerSetInvalid(&tup->t_self);
2764 tup->t_tableOid = RelationGetRelid(relation);
2765
2766 /*
2767 * Put the tuple we read in a slot. This deforms it, so that we can hack
2768 * the external attributes in place.
2769 */
2770 ExecForceStoreHeapTuple(tup, slot, false);
2771
2772 /*
2773 * Next, read any attributes we stored separately into the tts_values
2774 * array elements expecting them, if any. This matches
2775 * repack_store_change.
2776 */
2777 BufFileReadExact(file, &natt_ext, sizeof(natt_ext));
2778 if (natt_ext > 0)
2779 {
2780 TupleDesc desc = slot->tts_tupleDescriptor;
2781
2782 for (int i = 0; i < desc->natts; i++)
2783 {
2785 varlena *varlen;
2787 void *value;
2788 Size varlensz;
2789
2790 if (attr->attisdropped || attr->attlen != -1)
2791 continue;
2792 if (slot_attisnull(slot, i + 1))
2793 continue;
2796 continue;
2797 slot_getsomeattrs(slot, i + 1);
2798
2801
2804 BufFileReadExact(file, (char *) value + VARHDRSZ, varlensz - VARHDRSZ);
2805
2807 natt_ext--;
2808 if (natt_ext < 0)
2809 ereport(ERROR,
2811 errmsg("insufficient number of attributes stored separately"));
2812 }
2813 }
2814}
2815
2816/*
2817 * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the
2818 * corresponding ones from 'src'.
2819 */
2820static void
2822{
2823 TupleDesc desc = dest->tts_tupleDescriptor;
2824
2825 for (int i = 0; i < desc->natts; i++)
2826 {
2829
2830 if (attr->attisdropped)
2831 continue;
2832 if (attr->attlen != -1)
2833 continue;
2834 if (slot_attisnull(dest, i + 1))
2835 continue;
2836
2837 slot_getsomeattrs(dest, i + 1);
2838
2839 varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]);
2841 continue;
2842 slot_getsomeattrs(src, i + 1);
2843
2844 dest->tts_values[i] = src->tts_values[i];
2845 }
2846}
2847
2848/*
2849 * Find the tuple to be updated or deleted by the given data change, whose
2850 * tuple has already been loaded into locator.
2851 *
2852 * If the tuple is found, put it in retrieved and return true. If the tuple is
2853 * not found, return false.
2854 */
2855static bool
2858{
2859 Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
2860 IndexScanDesc scan;
2861 bool retval = false;
2862
2863 /*
2864 * Scan key is passed by caller, so it does not have to be constructed
2865 * multiple times. Key entries have all fields initialized, except for
2866 * sk_argument.
2867 *
2868 * Use the incoming tuple to finalize the scan key.
2869 */
2870 for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2871 {
2872 ScanKey entry = &chgcxt->cc_ident_key[i];
2873 AttrNumber attno = idx->indkey.values[i];
2874
2875 entry->sk_argument = locator->tts_values[attno - 1];
2876 Assert(!locator->tts_isnull[attno - 1]);
2877 }
2878
2879 /* XXX no instrumentation for now */
2880 scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
2881 NULL, chgcxt->cc_ident_key_nentries, 0, 0);
2882 index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
2884 {
2885 /* Be wary of temporal constraints */
2886 if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
2887 {
2889 continue;
2890 }
2891
2892 retval = true;
2893 break;
2894 }
2895 index_endscan(scan);
2896
2897 return retval;
2898}
2899
2900/*
2901 * Check whether the candidate tuple matches the locator tuple on all replica
2902 * identity key columns, using the same equality operators as the identity
2903 * index scan. The locator tuple has already been loaded into cc_ident_key.
2904 *
2905 * This is needed to filter lossy index matches, such as GiST multirange scans
2906 * used for temporal constraints.
2907 */
2908static bool
2911{
2912 slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
2913 slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
2914
2915 for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
2916 {
2917 ScanKey entry = &chgcxt->cc_ident_key[i];
2918 AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
2919
2920 Assert(attno > 0);
2921
2922 if (locator->tts_isnull[attno - 1] != candidate->tts_isnull[attno - 1])
2923 return false;
2924
2925 if (locator->tts_isnull[attno - 1])
2926 continue;
2927
2929 entry->sk_collation,
2930 candidate->tts_values[attno - 1],
2931 entry->sk_argument)))
2932 return false;
2933 }
2934
2935 return true;
2936}
2937
2938/*
2939 * Decode and apply concurrent changes, up to (and including) the record whose
2940 * LSN is 'end_of_wal'.
2941 *
2942 * XXX the names "process_concurrent_changes" and "apply_concurrent_changes"
2943 * are far too similar to each other.
2944 */
2945static void
2947{
2948 DecodingWorkerShared *shared;
2949 char fname[MAXPGPATH];
2950 BufFile *file;
2951
2954
2955 /* Ask the worker for the file. */
2957 SpinLockAcquire(&shared->mutex);
2958 shared->lsn_upto = end_of_wal;
2959 shared->done = done;
2960 SpinLockRelease(&shared->mutex);
2961
2962 /*
2963 * The worker needs to finish processing of the current WAL record. Even
2964 * if it's idle, it'll need to close the output file. Thus we're likely to
2965 * wait, so prepare for sleep.
2966 */
2968 for (;;)
2969 {
2970 int last_exported;
2971
2972 SpinLockAcquire(&shared->mutex);
2973 last_exported = shared->last_exported;
2974 SpinLockRelease(&shared->mutex);
2975
2976 /*
2977 * Has the worker exported the file we are waiting for?
2978 */
2979 if (last_exported == chgcxt->cc_file_seq)
2980 break;
2981
2983 }
2985
2986 /* Open the file. */
2987 DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
2988 file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
2990
2991 BufFileClose(file);
2992
2993 /* Get ready for the next file. */
2994 chgcxt->cc_file_seq++;
2995}
2996
2997/*
2998 * Initialize the ChangeContext struct for the given relation, with
2999 * the given index as identity index.
3000 */
3001static void
3003 Relation relation, Oid ident_index_id)
3004{
3005 chgcxt->cc_rel = relation;
3006
3007 /* Only initialize fields needed by ExecInsertIndexTuples(). */
3008 chgcxt->cc_estate = CreateExecutorState();
3009
3010 chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
3011 InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
3012 ExecOpenIndices(chgcxt->cc_rri, false);
3013
3014 /*
3015 * The table's relcache entry already has the relcache entry for the
3016 * identity index; find that.
3017 */
3018 chgcxt->cc_ident_index = NULL;
3019 for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
3020 {
3022
3023 ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
3024 if (ind_rel->rd_id == ident_index_id)
3025 {
3026 chgcxt->cc_ident_index = ind_rel;
3027 break;
3028 }
3029 }
3030 if (chgcxt->cc_ident_index == NULL)
3031 elog(ERROR, "could not find identity index");
3032
3033 /* Set up for scanning said identity index */
3034 {
3036
3037 indexForm = chgcxt->cc_ident_index->rd_index;
3038 chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
3039 chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
3040 for (int i = 0; i < indexForm->indnkeyatts; i++)
3041 {
3042 ScanKey entry;
3043 Oid opfamily,
3044 opcintype,
3045 opno,
3046 opcode;
3048
3049 entry = &chgcxt->cc_ident_key[i];
3050
3051 opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
3052 opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
3054 chgcxt->cc_ident_index->rd_rel->relam,
3055 opfamily, false);
3057 elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
3058 opfamily, opcintype);
3059 opno = get_opfamily_member(opfamily, opcintype, opcintype,
3060 eq_strategy);
3061 if (!OidIsValid(opno))
3062 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
3063 eq_strategy, opcintype, opcintype, opfamily);
3064 opcode = get_opcode(opno);
3065 if (!OidIsValid(opcode))
3066 elog(ERROR, "missing oprcode for operator %u", opno);
3067
3068 /* Initialize everything but argument. */
3069 ScanKeyInit(entry,
3070 i + 1,
3071 eq_strategy, opcode,
3072 (Datum) 0);
3073 entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
3074 }
3075 }
3076
3077 /* Determine the last column we must deform to read the identity */
3078 chgcxt->cc_last_key_attno = InvalidAttrNumber;
3079 for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
3080 {
3081 AttrNumber attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
3082
3083 Assert(attno > 0);
3084 chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
3085 }
3086
3087 chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
3088}
3089
3090/*
3091 * Free up resources taken by a ChangeContext.
3092 */
3093static void
3095{
3096 ExecCloseIndices(chgcxt->cc_rri);
3097 FreeExecutorState(chgcxt->cc_estate);
3098 /* XXX are these pfrees necessary? */
3099 pfree(chgcxt->cc_rri);
3100 pfree(chgcxt->cc_ident_key);
3101}
3102
3103/*
3104 * The final steps of rebuild_relation() for concurrent processing.
3105 *
3106 * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
3107 * clustering index (if one is passed) are still locked in a mode that allows
3108 * concurrent data changes. On exit, both tables and their indexes are closed,
3109 * but locked in AccessExclusiveLock mode.
3110 */
3111static void
3115{
3120 ListCell *lc,
3121 *lc2;
3122 char relpersistence;
3123 bool is_system_catalog;
3125 XLogRecPtr end_of_wal;
3126 List *indexrels;
3128
3131
3132 /*
3133 * Unlike the exclusive case, we build new indexes for the new relation
3134 * rather than swapping the storage and reindexing the old relation. The
3135 * point is that the index build can take some time, so we do it before we
3136 * get AccessExclusiveLock on the old heap and therefore we cannot swap
3137 * the heap storage yet.
3138 *
3139 * index_create() will lock the new indexes using AccessExclusiveLock - no
3140 * need to change that. At the same time, we use ShareUpdateExclusiveLock
3141 * to lock the existing indexes - that should be enough to prevent others
3142 * from changing them while we're repacking the relation. The lock on
3143 * table should prevent others from changing the index column list, but
3144 * might not be enough for commands like ALTER INDEX ... SET ... (Those
3145 * are not necessarily dangerous, but can make user confused if the
3146 * changes they do get lost due to REPACK.)
3147 */
3149
3150 /*
3151 * The identity index in the new relation appears in the same relative
3152 * position as the corresponding index in the old relation. Find it.
3153 */
3156 {
3157 if (identIdx == ind_old)
3158 {
3159 int pos = foreach_current_index(ind_old);
3160
3161 if (list_length(ind_oids_new) <= pos)
3162 elog(ERROR, "list of new indexes too short");
3164 break;
3165 }
3166 }
3168 elog(ERROR, "could not find index matching \"%s\" at the new relation",
3170
3171 /* Gather information to apply concurrent changes. */
3173
3174 /*
3175 * During testing, wait for another backend to perform concurrent data
3176 * changes which we will process below.
3177 */
3178 INJECTION_POINT("repack-concurrently-before-lock", NULL);
3179
3180 /*
3181 * Flush all WAL records inserted so far (possibly except for the last
3182 * incomplete page; see GetInsertRecPtr), to minimize the amount of data
3183 * we need to flush while holding exclusive lock on the source table.
3184 */
3186 end_of_wal = GetFlushRecPtr(NULL);
3187
3188 /*
3189 * Apply concurrent changes first time, to minimize the time we need to
3190 * hold AccessExclusiveLock. (Quite some amount of WAL could have been
3191 * written during the data copying and index creation.)
3192 */
3193 process_concurrent_changes(end_of_wal, &chgcxt, false);
3194
3195 /*
3196 * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
3197 * is one), all its indexes, so that we can swap the files.
3198 */
3200
3201 /*
3202 * Lock all indexes now, not only the clustering one: all indexes need to
3203 * have their files swapped. While doing that, store their relation
3204 * references in a zero-terminated array, to handle predicate locks below.
3205 */
3206 indexrels = NIL;
3208 {
3210
3212
3213 /*
3214 * Some things about the index may have changed before we locked the
3215 * index, such as ALTER INDEX RENAME. We don't need to do anything
3216 * here to absorb those changes in the new index.
3217 */
3219 }
3220
3221 /*
3222 * Lock the OldHeap's TOAST relation exclusively - again, the lock is
3223 * needed to swap the files.
3224 */
3225 if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
3226 LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
3227
3228 /*
3229 * Tuples and pages of the old heap will be gone, but the heap will stay.
3230 */
3233 {
3236 }
3238
3239 /*
3240 * Flush WAL again, to make sure that all changes committed while we were
3241 * waiting for the exclusive lock are available for decoding.
3242 */
3244 end_of_wal = GetFlushRecPtr(NULL);
3245
3246 /*
3247 * Apply the concurrent changes again. Indicate that the decoding worker
3248 * won't be needed anymore.
3249 */
3250 process_concurrent_changes(end_of_wal, &chgcxt, true);
3251
3252 /* Remember info about rel before closing OldHeap */
3253 relpersistence = OldHeap->rd_rel->relpersistence;
3255
3258
3259 /*
3260 * Even ShareUpdateExclusiveLock should have prevented others from
3261 * creating / dropping indexes (even using the CONCURRENTLY option), so we
3262 * do not need to check whether the lists match.
3263 */
3265 {
3268 Oid mapped_tables[4] = {0};
3269
3272 false, /* swap_toast_by_content */
3273 true,
3277
3278#ifdef USE_ASSERT_CHECKING
3279
3280 /*
3281 * Concurrent processing is not supported for system relations, so
3282 * there should be no mapped tables.
3283 */
3284 for (int i = 0; i < 4; i++)
3286#endif
3287 }
3288
3289 /* The new indexes must be visible for deletion. */
3291
3292 /* Close the old heap but keep lock until transaction commit. */
3294 /* Close the new heap. (We didn't have to open its indexes). */
3296
3297 /* Cleanup what we don't need anymore. (And close the identity index.) */
3299
3300 /*
3301 * Swap the relations and their TOAST relations and TOAST indexes. This
3302 * also drops the new relation and its indexes.
3303 *
3304 * (System catalogs are currently not supported.)
3305 */
3309 false, /* swap_toast_by_content */
3310 false,
3311 true,
3312 false, /* reindex */
3314 relpersistence);
3315}
3316
3317/*
3318 * Build indexes on NewHeap according to those on OldHeap.
3319 *
3320 * OldIndexes is the list of index OIDs on OldHeap. The contained indexes end
3321 * up locked using ShareUpdateExclusiveLock.
3322 *
3323 * A list of OIDs of the corresponding indexes created on NewHeap is
3324 * returned. The order of items does match, so we can use these arrays to swap
3325 * index storage.
3326 */
3327static List *
3359
3360/*
3361 * Create a transient copy of a constraint -- supported by a transient
3362 * copy of the index that supports the original constraint.
3363 *
3364 * When repacking a table that contains exclusion constraints, the executor
3365 * relies on these constraints being properly catalogued. These copies are
3366 * to support that.
3367 *
3368 * We don't need the constraints for anything else (the original constraints
3369 * will be there once repack completes), so we add pg_depend entries so that
3370 * the are dropped when the transient table is dropped.
3371 */
3372static void
3374{
3376 Relation rel;
3377 TupleDesc desc;
3378 SysScanDesc scan;
3379 HeapTuple tup;
3381
3384
3385 /*
3386 * Retrieve the constraints supported by the old index and create an
3387 * identical one that points to the new index.
3388 */
3392 ObjectIdGetDatum(old_index->rd_index->indrelid));
3394 NULL, 1, &skey);
3395 desc = RelationGetDescr(rel);
3396 while (HeapTupleIsValid(tup = systable_getnext(scan)))
3397 {
3399 Oid oid;
3401 bool nulls[Natts_pg_constraint] = {0};
3402 bool replaces[Natts_pg_constraint] = {0};
3405
3406 if (conform->conindid != RelationGetRelid(old_index))
3407 continue;
3408
3412 replaces[Anum_pg_constraint_oid - 1] = true;
3417
3418 new_tup = heap_modify_tuple(tup, desc, values, nulls, replaces);
3419
3420 /* Insert it into the catalog. */
3422
3423 /* Create a dependency so it's removed when we drop the new heap. */
3426 }
3427 systable_endscan(scan);
3428
3430
3432}
3433
3434/*
3435 * Try to start a background worker to perform logical decoding of data
3436 * changes applied to relation while REPACK CONCURRENTLY is copying its
3437 * contents to a new table.
3438 */
3439static void
3441{
3442 Size size;
3443 DecodingWorkerShared *shared;
3444 shm_mq *mq;
3446
3448
3449 /* Setup shared memory. */
3450 size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
3452 decoding_worker->seg = dsm_create(size, 0);
3453
3455 shared->initialized = false;
3456 shared->lsn_upto = InvalidXLogRecPtr;
3457 shared->done = false;
3459 shared->last_exported = -1;
3460 SpinLockInit(&shared->mutex);
3461 shared->dbid = MyDatabaseId;
3462
3463 /*
3464 * This is the UserId set in cluster_rel(). Security context shouldn't be
3465 * needed for decoding worker.
3466 */
3467 shared->roleid = GetUserId();
3468 shared->relid = relid;
3469 ConditionVariableInit(&shared->cv);
3470 shared->backend_proc = MyProc;
3471 shared->backend_pid = MyProcPid;
3473
3474 mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
3477
3479
3480 memset(&bgw, 0, sizeof(bgw));
3481 snprintf(bgw.bgw_name, BGW_MAXLEN,
3482 "REPACK decoding worker for relation \"%s\"",
3483 get_rel_name(relid));
3484 snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
3485 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
3487 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
3488 bgw.bgw_restart_time = BGW_NEVER_RESTART;
3489 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3490 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
3492 bgw.bgw_notify_pid = MyProcPid;
3493
3495 ereport(ERROR,
3497 errmsg("out of background worker slots"),
3498 errhint("You might need to increase \"%s\".", "max_worker_processes"));
3499
3500 /*
3501 * The decoding setup must be done before the caller can have XID assigned
3502 * for any reason, otherwise the worker might end up in a deadlock,
3503 * waiting for the caller's transaction to end. Therefore wait here until
3504 * the worker indicates that it has the logical decoding initialized.
3505 */
3507 for (;;)
3508 {
3509 bool initialized;
3510
3511 SpinLockAcquire(&shared->mutex);
3512 initialized = shared->initialized;
3513 SpinLockRelease(&shared->mutex);
3514
3515 if (initialized)
3516 break;
3517
3519 }
3521}
3522
3523/*
3524 * Stop the decoding worker and cleanup the related resources.
3525 *
3526 * The worker stops on its own when it knows there is no more work to do, but
3527 * we need to stop it explicitly at least on ERROR in the launching backend.
3528 */
3529static void
3531{
3532 /* Nothing to do if no worker was set up. */
3533 if (decoding_worker == NULL)
3534 return;
3535
3536 /* Terminate the worker process, if one is running. */
3537 if (decoding_worker->handle != NULL)
3538 {
3539 BgwHandleStatus status;
3540
3542 /* The worker should really exit before the REPACK command does. */
3546
3547 if (status == BGWH_POSTMASTER_DIED)
3548 ereport(FATAL,
3550 errmsg("postmaster exited during REPACK command"));
3551 }
3552
3553 /*
3554 * Now detach from our shared memory segment. In error cases there might
3555 * still be messages from the worker in the queue, which ProcessInterrupts
3556 * would try to read; this is pointless (and causes an assertion failure),
3557 * so set the global pointer to NULL to have ProcessRepackMessages ignore
3558 * them.
3559 *
3560 * We must also cancel the current sleep, if one is still set up. This is
3561 * critical because the CV lives in the DSM that we're about to detach, so
3562 * if we omit it, later automatic cleanup tries to clear freed memory.
3563 */
3567 if (decoding_worker->seg != NULL)
3571}
3572
3573/* stop_repack_decoding_worker, wrapped as a before_shmem_exit callback */
3574static void
3579
3580/*
3581 * Get the initial snapshot from the decoding worker.
3582 */
3583static Snapshot
3585{
3586 DecodingWorkerShared *shared;
3587 char fname[MAXPGPATH];
3588 BufFile *file;
3590 char *snap_space;
3591 Snapshot snapshot;
3592
3593 shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
3594
3595 /*
3596 * The worker needs to initialize the logical decoding, which usually
3597 * takes some time. Therefore it makes sense to prepare for the sleep
3598 * first.
3599 */
3601 for (;;)
3602 {
3603 int last_exported;
3604
3605 SpinLockAcquire(&shared->mutex);
3606 last_exported = shared->last_exported;
3607 SpinLockRelease(&shared->mutex);
3608
3609 /*
3610 * Has the worker exported the file we are waiting for?
3611 */
3612 if (last_exported == WORKER_FILE_SNAPSHOT)
3613 break;
3614
3616 }
3618
3619 /* Read the snapshot from a file. */
3621 file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
3622 BufFileReadExact(file, &snap_size, sizeof(snap_size));
3623 snap_space = (char *) palloc(snap_size);
3625 BufFileClose(file);
3626
3627 /* Restore it. */
3628 snapshot = RestoreSnapshot(snap_space);
3630
3631 return snapshot;
3632}
3633
3634/*
3635 * Generate worker's file name into 'fname', which must be of size MAXPGPATH.
3636 * If relations of the same 'relid' happen to be processed at the same time,
3637 * they must be from different databases and therefore different backends must
3638 * be involved.
3639 */
3640void
3642{
3643 /* The PID is already present in the fileset name, so we needn't add it */
3644 snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
3645}
3646
3647/*
3648 * Handle receipt of an interrupt indicating a repack worker message.
3649 *
3650 * Note: this is called within a signal handler! All we can do is set
3651 * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
3652 * ProcessRepackMessages().
3653 */
3654void
3656{
3657 InterruptPending = true;
3658 RepackMessagePending = true;
3660}
3661
3662/*
3663 * Process any queued protocol messages received from the repack worker.
3664 */
3665void
3667{
3668 MemoryContext oldcontext;
3670
3671 /*
3672 * Nothing to do if we haven't launched the worker yet or have already
3673 * terminated it.
3674 */
3675 if (decoding_worker == NULL)
3676 return;
3677
3678 /*
3679 * This is invoked from ProcessInterrupts(), and since some of the
3680 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3681 * for recursive calls if more signals are received while this runs. It's
3682 * unclear that recursive entry would be safe, and it doesn't seem useful
3683 * even if it is safe, so let's block interrupts until done.
3684 */
3686
3687 /*
3688 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3689 * don't want to risk leaking data into long-lived contexts, so let's do
3690 * our work here in a private context that we can reset on each use.
3691 */
3692 if (hpm_context == NULL) /* first time through? */
3694 "ProcessRepackMessages",
3696 else
3698
3699 oldcontext = MemoryContextSwitchTo(hpm_context);
3700
3701 /* OK to process messages. Reset the flag saying there are more to do. */
3702 RepackMessagePending = false;
3703
3704 /*
3705 * Read as many messages as we can from the worker, but stop when no more
3706 * messages can be read from the worker without blocking.
3707 */
3708 while (true)
3709 {
3710 shm_mq_result res;
3711 Size nbytes;
3712 void *data;
3713
3715 &data, true);
3716 if (res == SHM_MQ_WOULD_BLOCK)
3717 break;
3718 else if (res == SHM_MQ_SUCCESS)
3719 {
3720 StringInfoData msg;
3721
3722 initStringInfo(&msg);
3723 appendBinaryStringInfo(&msg, data, nbytes);
3725 pfree(msg.data);
3726 }
3727 else
3728 {
3729 /*
3730 * The decoding worker is special in that it exits as soon as it
3731 * has its work done. Thus the DETACHED result code is fine.
3732 */
3733 Assert(res == SHM_MQ_DETACHED);
3734
3735 break;
3736 }
3737 }
3738
3739 MemoryContextSwitchTo(oldcontext);
3740
3741 /* Might as well clear the context on our way out */
3743
3745}
3746
3747/*
3748 * Process a single protocol message received from a single parallel worker.
3749 */
3750static void
3752{
3753 char msgtype;
3754
3755 msgtype = pq_getmsgbyte(msg);
3756
3757 switch (msgtype)
3758 {
3761 {
3763
3764 /* Parse ErrorResponse or NoticeResponse. */
3766
3767 /* Death of a worker isn't enough justification for suicide. */
3768 edata.elevel = Min(edata.elevel, ERROR);
3769
3770 /*
3771 * Add a context line to show that this is a message
3772 * propagated from the worker. Otherwise, it can sometimes be
3773 * confusing to understand what actually happened.
3774 */
3775 if (edata.context)
3776 edata.context = psprintf("%s\n%s", edata.context,
3777 _("REPACK decoding worker"));
3778 else
3779 edata.context = pstrdup(_("REPACK decoding worker"));
3780
3781 /* Rethrow error or print notice. */
3783
3784 break;
3785 }
3786
3787 default:
3788 {
3789 elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
3790 msgtype, msg->len);
3791 }
3792 }
3793}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:263
@ ACLCHECK_OK
Definition acl.h:184
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4083
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
Definition amapi.c:161
int16 AttrNumber
Definition attnum.h:21
#define InvalidAttrNumber
Definition attnum.h:23
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_incr_param(int index, int64 incr)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
Definition bgworker.c:1319
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
Definition bgworker.c:1280
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition bgworker.c:1068
#define BGW_NEVER_RESTART
Definition bgworker.h:92
BgwHandleStatus
Definition bgworker.h:111
@ BGWH_POSTMASTER_DIED
Definition bgworker.h:115
@ BgWorkerStart_RecoveryFinished
Definition bgworker.h:88
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition bgworker.h:53
#define BGW_MAXLEN
Definition bgworker.h:93
uint32 BlockNumber
Definition block.h:31
static Datum values[MAXATTR]
Definition bootstrap.c:190
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
void BufFileClose(BufFile *file)
Definition buffile.c:413
#define RelationGetNumberOfBlocks(reln)
Definition bufmgr.h:309
#define NameStr(name)
Definition c.h:835
#define Min(x, y)
Definition c.h:1091
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:249
#define Max(x, y)
Definition c.h:1085
#define BUFFERALIGN(LEN)
Definition c.h:898
#define VARHDRSZ
Definition c.h:781
#define Assert(condition)
Definition c.h:943
TransactionId MultiXactId
Definition c.h:746
int32_t int32
Definition c.h:620
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
float float4
Definition c.h:713
uint32 TransactionId
Definition c.h:736
#define OidIsValid(objectId)
Definition c.h:858
size_t Size
Definition c.h:689
bool IsToastRelation(Relation relation)
Definition catalog.c:206
bool IsSystemRelation(Relation relation)
Definition catalog.c:74
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
bool IsSystemClass(Oid relid, Form_pg_class reltuple)
Definition catalog.c:86
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
@ COMPARE_EQ
Definition cmptype.h:36
void analyze_rel(Oid relid, RangeVar *relation, const VacuumParams *params, List *va_cols, bool in_outer_xact, BufferAccessStrategy bstrategy)
Definition analyze.c:110
bool ConditionVariableCancelSleep(void)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
bool defGetBoolean(DefElem *def)
Definition define.c:93
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
@ DEPENDENCY_AUTO
Definition dependency.h:34
@ DEPENDENCY_INTERNAL
Definition dependency.h:35
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition dsm.c:1131
void dsm_detach(dsm_segment *seg)
Definition dsm.c:811
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_create(Size size, int flags)
Definition dsm.c:524
Datum arg
Definition elog.c:1323
void ThrowErrorData(ErrorData *edata)
Definition elog.c:2091
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:42
#define WARNING
Definition elog.h:37
#define DEBUG2
Definition elog.h:30
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define INFO
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:152
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, EState *estate, uint32 flags, TupleTableSlot *slot, List *arbiterIndexes, bool *specConflict)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1271
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
void FreeExecutorState(EState *estate)
Definition execUtils.c:197
EState * CreateExecutorState(void)
Definition execUtils.c:90
#define ResetPerTupleExprContext(estate)
Definition executor.h:676
#define EIIT_IS_UPDATE
Definition executor.h:757
#define GetPerTupleMemoryContext(estate)
Definition executor.h:672
#define EIIT_ONLY_SUMMARIZING
Definition executor.h:759
#define palloc_object(type)
Definition fe_memutils.h:89
#define palloc_array(type, count)
Definition fe_memutils.h:91
#define palloc0_object(type)
Definition fe_memutils.h:90
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition fmgr.c:1151
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:604
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:515
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
volatile sig_atomic_t InterruptPending
Definition globals.c:32
int MyProcPid
Definition globals.c:49
ProcNumber MyProcNumber
Definition globals.c:92
bool allowSystemTableMods
Definition globals.c:132
struct Latch * MyLatch
Definition globals.c:65
Oid MyDatabaseId
Definition globals.c:96
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
void RelationClearMissing(Relation rel)
Definition heap.c:1984
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1140
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition heapam.c:1435
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HEAPTUPLESIZE
Definition htup.h:73
HeapTupleData * HeapTuple
Definition htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition htup.h:23
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
Oid IndexGetRelation(Oid indexId, bool missing_ok)
Definition index.c:3604
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
Oid index_create_copy(Relation heapRelation, uint16 flags, Oid oldIndexId, Oid tablespaceOid, const char *newName)
Definition index.c:1306
#define INDEX_CREATE_SUPPRESS_PROGRESS
Definition index.h:74
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition indexam.c:698
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys, uint32 flags)
Definition indexam.c:257
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
void index_endscan(IndexScanDesc scan)
Definition indexam.c:394
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition indexam.c:368
char * ChooseRelationName(const char *name1, const char *name2, const char *label, Oid namespaceid, bool isconstraint)
Definition indexcmds.c:2634
void CatalogTupleUpdateWithInfo(Relation heapRel, const ItemPointerData *otid, HeapTuple tup, CatalogIndexState indstate)
Definition indexing.c:337
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CatalogCloseIndexes(CatalogIndexState indstate)
Definition indexing.c:61
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
CatalogIndexState CatalogOpenIndexes(Relation heapRel)
Definition indexing.c:43
static struct @177 value
#define INJECTION_POINT(name, arg)
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1609
void CacheInvalidateRelcacheByTuple(HeapTuple classTuple)
Definition inval.c:1666
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:52
int i
Definition isn.c:77
static void ItemPointerSetInvalid(ItemPointerData *pointer)
Definition itemptr.h:184
void SetLatch(Latch *latch)
Definition latch.c:290
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
void list_free(List *list)
Definition list.c:1546
bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:151
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
Definition lmgr.c:334
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition lmgr.c:351
int LOCKMODE
Definition lockdefs.h:26
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition lockdefs.h:39
#define RowExclusiveLock
Definition lockdefs.h:38
LockTupleMode
Definition lockoptions.h:51
char * get_rel_name(Oid relid)
Definition lsyscache.c:2159
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2234
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2183
RegProcedure get_opcode(Oid opno)
Definition lsyscache.c:1516
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3879
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition lsyscache.c:170
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3599
Oid get_relname_relid(const char *relname, Oid relnamespace)
Definition lsyscache.c:2116
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
void * palloc(Size size)
Definition mcxt.c:1390
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
MemoryContext PortalContext
Definition mcxt.c:176
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:322
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
Oid GetUserId(void)
Definition miscinit.c:470
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
bool MultiXactIdPrecedes(MultiXactId multi1, MultiXactId multi2)
Definition multixact.c:2865
#define MultiXactIdIsValid(multi)
Definition multixact.h:29
#define InvalidMultiXactId
Definition multixact.h:25
bool isTempOrTempToastNamespace(Oid namespaceId)
Definition namespace.c:3745
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
Oid RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, uint32 flags, RangeVarGetRelidCallback callback, void *callback_arg)
Definition namespace.c:442
static char * errmsg
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
#define ObjectAddressSet(addr, class_id, object_id)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
RepackCommand
@ REPACK_COMMAND_REPACK
@ REPACK_COMMAND_CLUSTER
@ REPACK_COMMAND_VACUUMFULL
#define ACL_MAINTAIN
Definition parsenodes.h:90
@ DROP_RESTRICT
static int verbose
#define ERRCODE_DATA_CORRUPTED
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define MAXPGPATH
END_CATALOG_STRUCT typedef FormData_pg_constraint * Form_pg_constraint
const void * data
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:51
long changeDependencyFor(Oid classId, Oid objectId, Oid refClassId, Oid oldRefObjectId, Oid newRefObjectId)
Definition pg_depend.c:470
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:314
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define forboth(cell1, list1, cell2, list2)
Definition pg_list.h:550
#define foreach_current_index(var_or_cell)
Definition pg_list.h:435
static Oid list_nth_oid(const List *list, int n)
Definition pg_list.h:353
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
#define foreach_oid(var, lst)
Definition pg_list.h:503
#define lfirst_oid(lc)
Definition pg_list.h:174
const char * pg_rusage_show(const PGRUsage *ru0)
Definition pg_rusage.c:40
void pg_rusage_init(PGRUsage *ru0)
Definition pg_rusage.c:27
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
bool plan_cluster_use_sort(Oid tableOid, Oid indexOid)
Definition planner.c:7161
#define snprintf
Definition port.h:261
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
static Datum UInt32GetDatum(uint32 X)
Definition postgres.h:232
#define PointerGetDatum(X)
Definition postgres.h:354
#define InvalidOid
unsigned int Oid
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition pqmq.c:229
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3052
static int fb(int x)
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define PROGRESS_REPACK_PHASE_CATCH_UP
Definition progress.h:103
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_HEAP_TUPLES_DELETED
Definition progress.h:91
#define PROGRESS_REPACK_HEAP_TUPLES_UPDATED
Definition progress.h:90
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
#define PROGRESS_REPACK_HEAP_TUPLES_INSERTED
Definition progress.h:89
#define PqMsg_ErrorResponse
Definition protocol.h:44
#define PqMsg_NoticeResponse
Definition protocol.h:49
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
static long analyze(struct nfa *nfa)
Definition regc_nfa.c:3051
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationIsPopulated(relation)
Definition rel.h:697
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:678
#define RelationGetNamespace(relation)
Definition rel.h:557
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4837
void RelationAssumeNewRelfilelocator(Relation relation)
Definition relcache.c:3978
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
RelFileNumber RelationMapOidToFilenumber(Oid relationId, bool shared)
Definition relmapper.c:166
void RelationMapUpdateMap(Oid relationId, RelFileNumber fileNumber, bool shared, bool immediate)
Definition relmapper.c:326
Oid RelFileNumber
Definition relpath.h:25
#define RelFileNumberIsValid(relnumber)
Definition relpath.h:27
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:708
static void restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot)
Definition repack.c:2751
static void start_repack_decoding_worker(Oid relid)
Definition repack.c:3440
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:895
static bool find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator, TupleTableSlot *retrieved)
Definition repack.c:2856
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2285
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1911
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2376
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:769
static void release_change_context(ChangeContext *chgcxt)
Definition repack.c:3094
static bool repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid)
Definition repack.c:2348
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:247
static void stop_repack_decoding_worker(void)
Definition repack.c:3530
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:488
static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
Definition repack.c:2717
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3751
volatile sig_atomic_t RepackMessagePending
Definition repack.c:152
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:521
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2132
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2510
void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
Definition repack.c:3641
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1154
static void initialize_change_context(ChangeContext *chgcxt, Relation relation, Oid ident_index_id)
Definition repack.c:3002
static bool identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator, TupleTableSlot *candidate)
Definition repack.c:2909
static void process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done)
Definition repack.c:2946
static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, Snapshot snapshot, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
Definition repack.c:1283
static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Oid identIdx, TransactionId frozenXid, MultiXactId cutoffMulti)
Definition repack.c:3112
static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot, ChangeContext *chgcxt)
Definition repack.c:2654
static void stop_repack_decoding_worker_cb(int code, Datum arg)
Definition repack.c:3575
static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
Definition repack.c:2528
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:1007
void HandleRepackMessageInterrupt(void)
Definition repack.c:3655
static Snapshot get_initial_snapshot(DecodingWorker *worker)
Definition repack.c:3584
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:829
static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src)
Definition repack.c:2821
#define WORKER_FILE_SNAPSHOT
Definition repack.c:100
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1529
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2467
void ProcessRepackMessages(void)
Definition repack.c:3666
static void copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id)
Definition repack.c:3373
static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple, TupleTableSlot *ondisk_tuple, ChangeContext *chgcxt)
Definition repack.c:2675
static DecodingWorker * decoding_worker
Definition repack.c:146
static List * build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
Definition repack.c:3328
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
#define CLUOPT_RECHECK
Definition repack.h:26
#define CHANGE_UPDATE_OLD
#define CHANGE_DELETE
#define CHANGE_UPDATE_NEW
char ConcurrentChangeKind
#define CHANGE_INSERT
#define REPACK_ERROR_QUEUE_SIZE
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
@ ForwardScanDirection
Definition sdir.h:28
void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
shm_mq * shm_mq_create(void *address, Size size)
Definition shm_mq.c:179
void shm_mq_detach(shm_mq_handle *mqh)
Definition shm_mq.c:845
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:208
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
ScanKeyData * ScanKey
Definition skey.h:75
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
Snapshot RestoreSnapshot(char *start_address)
Definition snapmgr.c:1793
void UpdateActiveSnapshotCommandId(void)
Definition snapmgr.c:744
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Snapshot GetActiveSnapshot(void)
Definition snapmgr.c:800
#define InvalidSnapshot
Definition snapshot.h:119
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
Relation relation_open(Oid relationId, LOCKMODE lockmode)
Definition relation.c:48
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
PGPROC * MyProc
Definition proc.c:71
void BecomeLockGroupLeader(void)
Definition proc.c:2075
uint16 StrategyNumber
Definition stratnum.h:22
#define InvalidStrategy
Definition stratnum.h:24
#define BTEqualStrategyNumber
Definition stratnum.h:31
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
int cc_file_seq
Definition repack.c:127
int cc_ident_key_nentries
Definition repack.c:121
Relation cc_rel
Definition repack.c:108
AttrNumber cc_last_key_attno
Definition repack.c:124
Relation cc_ident_index
Definition repack.c:119
ScanKey cc_ident_key
Definition repack.c:120
EState * cc_estate
Definition repack.c:112
ResultRelInfo * cc_rri
Definition repack.c:111
uint32 options
Definition repack.h:34
bool attisdropped
Definition tupdesc.h:78
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
dsm_segment * seg
Definition repack.c:139
BackgroundWorkerHandle * handle
Definition repack.c:136
shm_mq_handle * error_mqh
Definition repack.c:142
Definition pg_list.h:54
Oid indexOid
Definition repack.c:93
Oid tableOid
Definition repack.c:92
bool rd_ispkdeferrable
Definition rel.h:154
Oid rd_pkindex
Definition rel.h:153
Form_pg_class rd_rel
Definition rel.h:111
Datum sk_argument
Definition skey.h:72
FmgrInfo sk_func
Definition skey.h:71
Oid sk_collation
Definition skey.h:70
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
bool * tts_isnull
Definition tuptable.h:133
ItemPointerData tts_tid
Definition tuptable.h:142
Datum * tts_values
Definition tuptable.h:131
TransactionId FreezeLimit
Definition vacuum.h:288
TransactionId OldestXmin
Definition vacuum.h:278
TransactionId relfrozenxid
Definition vacuum.h:262
MultiXactId relminmxid
Definition vacuum.h:263
MultiXactId MultiXactCutoff
Definition vacuum.h:289
Definition type.h:97
Definition c.h:776
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
#define SearchSysCacheExists1(cacheId, key1)
Definition syscache.h:100
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
Definition tableam.c:113
const TupleTableSlotOps * table_slot_callbacks(Relation relation)
Definition tableam.c:59
TU_UpdateIndexes
Definition tableam.h:133
@ TU_Summarizing
Definition tableam.h:141
@ TU_None
Definition tableam.h:135
static void table_endscan(TableScanDesc scan)
Definition tableam.h:1061
#define TABLE_INSERT_NO_LOGICAL
Definition tableam.h:286
TM_Result
Definition tableam.h:95
@ TM_Ok
Definition tableam.h:100
static void table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, double *tups_recently_dead)
Definition tableam.h:1746
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, uint32 options, BulkInsertStateData *bistate)
Definition tableam.h:1458
#define TABLE_DELETE_NO_LOGICAL
Definition tableam.h:290
#define TABLE_UPDATE_NO_LOGICAL
Definition tableam.h:293
static TM_Result table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, uint32 options, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes)
Definition tableam.h:1598
static TM_Result table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, uint32 options, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd)
Definition tableam.h:1549
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4420
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4473
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4327
void RangeVarCallbackMaintainsTable(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg)
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:195
static void slot_getsomeattrs(TupleTableSlot *slot, int attnum)
Definition tuptable.h:376
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
static bool slot_attisnull(TupleTableSlot *slot, int attnum)
Definition tuptable.h:403
bool vacuum_get_cutoffs(Relation rel, const VacuumParams *params, struct VacuumCutoffs *cutoffs)
Definition vacuum.c:1106
#define VACOPT_VERBOSE
Definition vacuum.h:181
#define VACOPT_ANALYZE
Definition vacuum.h:180
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition varatt.h:361
static Size VARSIZE_ANY(const void *PTR)
Definition varatt.h:460
static bool VARATT_IS_EXTERNAL_INDIRECT(const void *PTR)
Definition varatt.h:368
static bool initialized
Definition win32ntdll.c:36
void CommandCounterIncrement(void)
Definition xact.c:1130
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831
int wal_level
Definition xlog.c:138
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6999
XLogRecPtr GetXLogInsertEndRecPtr(void)
Definition xlog.c:10110
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28