PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.h File Reference
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
Include dependency graph for repack.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ClusterParams
 

Macros

#define CLUOPT_VERBOSE   0x01 /* print progress info */
 
#define CLUOPT_RECHECK   0x02 /* recheck relation state */
 
#define CLUOPT_RECHECK_ISCLUSTERED
 
#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */
 
#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */
 

Typedefs

typedef struct ClusterParams ClusterParams
 

Functions

void ExecRepack (ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 
void cluster_rel (RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
 
void check_index_is_clusterable (Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
 
void mark_index_clustered (Relation rel, Oid indexOid, bool is_internal)
 
Oid make_new_heap (Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
 
void finish_heap_swap (Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
 
void HandleRepackMessageInterrupt (void)
 
void ProcessRepackMessages (void)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 

Variables

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
 

Macro Definition Documentation

◆ CLUOPT_ANALYZE

#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */

Definition at line 28 of file repack.h.

◆ CLUOPT_CONCURRENT

#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */

Definition at line 29 of file repack.h.

◆ CLUOPT_RECHECK

#define CLUOPT_RECHECK   0x02 /* recheck relation state */

Definition at line 26 of file repack.h.

◆ CLUOPT_RECHECK_ISCLUSTERED

#define CLUOPT_RECHECK_ISCLUSTERED
Value:
0x04 /* recheck relation state for
* indisclustered */

Definition at line 27 of file repack.h.

34{
35 uint32 options; /* bitmask of CLUOPT_* */
37
39
40
41extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
42
43extern void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
44 ClusterParams *params, bool isTopLevel);
46 LOCKMODE lockmode);
47extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
48
50 char relpersistence, LOCKMODE lockmode);
55 bool is_internal,
56 bool reindex,
59 char newrelpersistence);
60
61extern void HandleRepackMessageInterrupt(void);
62extern void ProcessRepackMessages(void);
63
64/* in repack_worker.c */
66extern bool AmRepackWorker(void);
67
68#endif /* REPACK_H */
#define PGDLLIMPORT
Definition c.h:1421
TransactionId MultiXactId
Definition c.h:746
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define stmt
int LOCKMODE
Definition lockdefs.h:26
RepackCommand
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
bool AmRepackWorker(void)
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1870
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:758
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:244
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:509
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1113
void HandleRepackMessageInterrupt(void)
Definition repack.c:3600
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:818
void RepackWorkerMain(Datum main_arg)
PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
Definition repack.c:150
void ProcessRepackMessages(void)
Definition repack.c:3611

◆ CLUOPT_VERBOSE

#define CLUOPT_VERBOSE   0x01 /* print progress info */

Definition at line 25 of file repack.h.

Typedef Documentation

◆ ClusterParams

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )
extern

Definition at line 187 of file repack_worker.c.

188{
189 return am_repack_worker;
190}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ check_index_is_clusterable()

void check_index_is_clusterable ( Relation  OldHeap,
Oid  indexOid,
LOCKMODE  lockmode 
)
extern

Definition at line 758 of file repack.c.

759{
761
762 OldIndex = index_open(indexOid, lockmode);
763
764 /*
765 * Check that index is in fact an index on the given relation
766 */
767 if (OldIndex->rd_index == NULL ||
768 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
771 errmsg("\"%s\" is not an index for table \"%s\"",
774
775 /* Index AM must allow clustering */
776 if (!OldIndex->rd_indam->amclusterable)
779 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
781
782 /*
783 * Disallow clustering on incomplete indexes (those that might not index
784 * every row of the relation). We could relax this by making a separate
785 * seqscan pass over the table to copy the missing rows, but that seems
786 * expensive and tedious.
787 */
788 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
791 errmsg("cannot cluster on partial index \"%s\"",
793
794 /*
795 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
796 * it might well not contain entries for every heap row, or might not even
797 * be internally consistent. (But note that we don't check indcheckxmin;
798 * the worst consequence of following broken HOT chains would be that we
799 * might put recently-dead tuples out-of-order in the new table, and there
800 * is little harm in that.)
801 */
802 if (!OldIndex->rd_index->indisvalid)
805 errmsg("cannot cluster on invalid index \"%s\"",
807
808 /* Drop relcache refcnt on OldIndex, but keep lock */
810}
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
#define NoLock
Definition lockdefs.h:34
static char * errmsg
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550

References ereport, errcode(), errmsg, ERROR, fb(), heap_attisnull(), index_close(), index_open(), NoLock, RelationGetRelationName, and RelationGetRelid.

Referenced by ATExecClusterOn(), cluster_rel(), ExecRepack(), and process_single_relation().

◆ cluster_rel()

void cluster_rel ( RepackCommand  cmd,
Relation  OldHeap,
Oid  indexOid,
ClusterParams params,
bool  isTopLevel 
)
extern

Definition at line 509 of file repack.c.

511{
512 Oid tableOid = RelationGetRelid(OldHeap);
515 Oid save_userid;
516 int save_sec_context;
517 int save_nestlevel;
518 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
519 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
520 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
522
523 /* Determine the lock mode to use. */
524 lmode = RepackLockLevel(concurrent);
525
526 /*
527 * Check some preconditions in the concurrent case. This also obtains the
528 * replica index OID.
529 */
530 if (concurrent)
532
533 /* Check for user-requested abort. */
535
538
539 /*
540 * Switch to the table owner's userid, so that any index functions are run
541 * as that user. Also lock down security-restricted operations and
542 * arrange to make GUC variable changes local to this command.
543 */
544 GetUserIdAndSecContext(&save_userid, &save_sec_context);
545 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
546 save_sec_context | SECURITY_RESTRICTED_OPERATION);
547 save_nestlevel = NewGUCNestLevel();
549
550 /*
551 * Recheck that the relation is still what it was when we started.
552 *
553 * Note that it's critical to skip this in single-relation CLUSTER;
554 * otherwise, we would reject an attempt to cluster using a
555 * not-previously-clustered index.
556 */
557 if (recheck &&
558 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
559 lmode, params->options))
560 goto out;
561
562 /*
563 * We allow repacking shared catalogs only when not using an index. It
564 * would work to use an index in most respects, but the index would only
565 * get marked as indisclustered in the current database, leading to
566 * unexpected behavior if CLUSTER were later invoked in another database.
567 */
568 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
571 /*- translator: first %s is name of a SQL command, eg. REPACK */
572 errmsg("cannot execute %s on a shared catalog",
574
575 /*
576 * The CONCURRENTLY case should have been rejected earlier because it does
577 * not support system catalogs.
578 */
579 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
580
581 /*
582 * Don't process temp tables of other backends ... their local buffer
583 * manager is not going to cope.
584 */
588 /*- translator: first %s is name of a SQL command, eg. REPACK */
589 errmsg("cannot execute %s on temporary tables of other sessions",
591
592 /*
593 * Also check for active uses of the relation in the current transaction,
594 * including open scans and pending AFTER trigger events.
595 */
597
598 /* Check heap and index are valid to cluster on */
599 if (OidIsValid(indexOid))
600 {
601 /* verify the index is good and lock it */
603 /* also open it */
604 index = index_open(indexOid, NoLock);
605 }
606 else
607 index = NULL;
608
609 /*
610 * When allow_system_table_mods is turned off, we disallow repacking a
611 * catalog on a particular index unless that's already the clustered index
612 * for that catalog.
613 *
614 * XXX We don't check for this in CLUSTER, because it's historically been
615 * allowed.
616 */
617 if (cmd != REPACK_COMMAND_CLUSTER &&
618 !allowSystemTableMods && OidIsValid(indexOid) &&
619 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
622 errmsg("permission denied: \"%s\" is a system catalog",
624 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
625 "allow_system_table_mods"));
626
627 /*
628 * Quietly ignore the request if this is a materialized view which has not
629 * been populated from its query. No harm is done because there is no data
630 * to deal with, and we don't want to throw an error if this is part of a
631 * multi-relation request -- for example, CLUSTER was run on the entire
632 * database.
633 */
634 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
636 {
637 if (index)
640 goto out;
641 }
642
643 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
644 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
645 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
646
647 /*
648 * All predicate locks on the tuples or pages are about to be made
649 * invalid, because we move tuples around. Promote them to relation
650 * locks. Predicate locks on indexes will be promoted when they are
651 * reindexed.
652 *
653 * During concurrent processing, the heap as well as its indexes stay in
654 * operation, so we postpone this step until they are locked using
655 * AccessExclusiveLock near the end of the processing.
656 */
657 if (!concurrent)
659
660 /* rebuild_relation does all the dirty work */
661 PG_TRY();
662 {
664 }
665 PG_FINALLY();
666 {
667 if (concurrent)
668 {
669 /*
670 * Since during normal operation the worker was already asked to
671 * exit, stopping it explicitly is especially important on ERROR.
672 * However it still seems a good practice to make sure that the
673 * worker never survives the REPACK command.
674 */
676 }
677 }
678 PG_END_TRY();
679
680 /* rebuild_relation closes OldHeap, and index if valid */
681
682out:
683 /* Roll back any GUC changes executed by index functions */
684 AtEOXact_GUC(false, save_nestlevel);
685
686 /* Restore userid and security context */
687 SetUserIdAndSecContext(save_userid, save_sec_context);
688
690}
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_FINALLY(...)
Definition elog.h:391
bool allowSystemTableMods
Definition globals.c:132
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:331
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
@ REPACK_COMMAND_CLUSTER
static int verbose
#define InvalidOid
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3052
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define RelationIsPopulated(relation)
Definition rel.h:688
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:669
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:697
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:884
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:758
static void stop_repack_decoding_worker(void)
Definition repack.c:3487
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:476
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2469
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:966
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK
Definition repack.h:26
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
uint32 options
Definition repack.h:34
Definition type.h:97
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4473

References allowSystemTableMods, Assert, AtEOXact_GUC(), check_concurrent_repack_requirements(), CHECK_FOR_INTERRUPTS, check_index_is_clusterable(), CheckTableNotInUse(), CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_VERBOSE, cluster_rel_recheck(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserIdAndSecContext(), index_close(), index_open(), InvalidOid, IsCatalogRelation(), NewGUCNestLevel(), NoLock, OidIsValid, ClusterParams::options, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_progress_end_command(), pgstat_progress_start_command(), pgstat_progress_update_param(), PROGRESS_COMMAND_REPACK, PROGRESS_REPACK_COMMAND, rebuild_relation(), relation_close(), RELATION_IS_OTHER_TEMP, RelationGetRelationName, RelationGetRelid, RelationIsPopulated, REPACK_COMMAND_CLUSTER, RepackCommandAsString(), RepackLockLevel(), RestrictSearchPath(), SECURITY_RESTRICTED_OPERATION, SetUserIdAndSecContext(), stop_repack_decoding_worker(), TransferPredicateLocksToHeapRelation(), and verbose.

Referenced by ExecRepack(), process_single_relation(), and vacuum_rel().

◆ ExecRepack()

void ExecRepack ( ParseState pstate,
RepackStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 244 of file repack.c.

245{
246 ClusterParams params = {0};
247 Relation rel = NULL;
249 LOCKMODE lockmode;
250 List *rtcs;
251
252 /* Parse option list */
253 foreach_node(DefElem, opt, stmt->params)
254 {
255 if (strcmp(opt->defname, "verbose") == 0)
256 params.options |= defGetBoolean(opt) ? CLUOPT_VERBOSE : 0;
257 else if (strcmp(opt->defname, "analyze") == 0 ||
258 strcmp(opt->defname, "analyse") == 0)
259 params.options |= defGetBoolean(opt) ? CLUOPT_ANALYZE : 0;
260 else if (strcmp(opt->defname, "concurrently") == 0 &&
261 defGetBoolean(opt))
262 {
263 if (stmt->command != REPACK_COMMAND_REPACK)
266 errmsg("CONCURRENTLY option not supported for %s",
267 RepackCommandAsString(stmt->command)));
268 params.options |= CLUOPT_CONCURRENT;
269 }
270 else
273 errmsg("unrecognized %s option \"%s\"",
274 RepackCommandAsString(stmt->command),
275 opt->defname),
276 parser_errposition(pstate, opt->location));
277 }
278
279 /* Determine the lock mode to use. */
280 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
281
282 if ((params.options & CLUOPT_CONCURRENT) != 0)
283 {
284 /*
285 * Make sure we're not in a transaction block.
286 *
287 * The reason is that repack_setup_logical_decoding() could wait
288 * indefinitely for our XID to complete. (The deadlock detector would
289 * not recognize it because we'd be waiting for ourselves, i.e. no
290 * real lock conflict.) It would be possible to run in a transaction
291 * block if we had no XID, but this restriction is simpler for users
292 * to understand and we don't lose any functionality.
293 */
294 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
295 }
296
297 /*
298 * If a single relation is specified, process it and we're done ... unless
299 * the relation is a partitioned table, in which case we fall through.
300 */
301 if (stmt->relation != NULL)
302 {
303 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
304 if (rel == NULL)
305 return; /* all done */
306 }
307
308 /*
309 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
310 * can add support for this later.
311 */
312 if (params.options & CLUOPT_ANALYZE)
315 errmsg("cannot execute %s on multiple tables",
316 "REPACK (ANALYZE)"));
317
318 /*
319 * By here, we know we are in a multi-table situation.
320 *
321 * Concurrent processing is currently considered rather special (e.g. in
322 * terms of resources consumed) so it is not performed in bulk.
323 */
324 if (params.options & CLUOPT_CONCURRENT)
325 {
326 if (rel != NULL)
327 {
328 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
331 errmsg("REPACK (CONCURRENTLY) is not supported for partitioned tables"),
332 errhint("Consider running the command on individual partitions."));
333 }
334 else
337 errmsg("REPACK (CONCURRENTLY) requires an explicit table name"));
338 }
339
340 /*
341 * In order to avoid holding locks for too long, we want to process each
342 * table in its own transaction. This forces us to disallow running
343 * inside a user transaction block.
344 */
346
347 /* Also, we need a memory context to hold our list of relations */
349 "Repack",
351
352 /*
353 * Since we open a new transaction for each relation, we have to check
354 * that the relation still is what we think it is.
355 *
356 * In single-transaction CLUSTER, we don't need the overhead.
357 */
358 params.options |= CLUOPT_RECHECK;
359
360 /*
361 * If we don't have a relation yet, determine a relation list. If we do,
362 * then it must be a partitioned table, and we want to process its
363 * partitions.
364 */
365 if (rel == NULL)
366 {
367 Assert(stmt->indexname == NULL);
368 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
371 }
372 else
373 {
374 Oid relid;
375 bool rel_is_index;
376
377 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
378
379 /*
380 * If USING INDEX was specified, resolve the index name now and pass
381 * it down.
382 */
383 if (stmt->usingindex)
384 {
385 /*
386 * If no index name was specified when repacking a partitioned
387 * table, punt for now. Maybe we can improve this later.
388 */
389 if (!stmt->indexname)
390 {
391 if (stmt->command == REPACK_COMMAND_CLUSTER)
394 errmsg("there is no previously clustered index for table \"%s\"",
396 else
399 /*- translator: first %s is name of a SQL command, eg. REPACK */
400 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
401 RepackCommandAsString(stmt->command),
403 }
404
405 relid = determine_clustered_index(rel, stmt->usingindex,
406 stmt->indexname);
407 if (!OidIsValid(relid))
408 elog(ERROR, "unable to determine index to cluster on");
410
411 rel_is_index = true;
412 }
413 else
414 {
415 relid = RelationGetRelid(rel);
416 rel_is_index = false;
417 }
418
420 relid, rel_is_index,
422
423 /* close parent relation, releasing lock on it */
425 rel = NULL;
426 }
427
428 /* Commit to get out of starting transaction */
431
432 /* Cluster the tables, each in a separate transaction */
433 Assert(rel == NULL);
435 {
436 /* Start a new transaction for each relation. */
438
439 /*
440 * Open the target table, coping with the case where it has been
441 * dropped.
442 */
443 rel = try_table_open(rtc->tableOid, lockmode);
444 if (rel == NULL)
445 {
447 continue;
448 }
449
450 /* functions in indexes may want a snapshot set */
452
453 /* Process this table */
454 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
455 /* cluster_rel closes the relation, but keeps lock */
456
459 }
460
461 /* Start a new transaction for the cleanup work. */
463
464 /* Clean up working storage */
466}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errhint(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:228
#define AccessExclusiveLock
Definition lockdefs.h:43
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
MemoryContext PortalContext
Definition mcxt.c:176
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
@ REPACK_COMMAND_REPACK
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2244
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2335
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:509
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2091
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2426
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207

References AccessExclusiveLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, check_index_is_clusterable(), CLUOPT_ANALYZE, CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_RECHECK_ISCLUSTERED, CLUOPT_VERBOSE, cluster_rel(), CommitTransactionCommand(), defGetBoolean(), determine_clustered_index(), elog, ereport, errcode(), errhint(), errmsg, ERROR, fb(), foreach_node, foreach_ptr, get_tables_to_repack(), get_tables_to_repack_partitioned(), GetTransactionSnapshot(), MemoryContextDelete(), OidIsValid, ClusterParams::options, parser_errposition(), PopActiveSnapshot(), PortalContext, PreventInTransactionBlock(), process_single_relation(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, REPACK_COMMAND_CLUSTER, REPACK_COMMAND_REPACK, RepackCommandAsString(), RepackLockLevel(), StartTransactionCommand(), stmt, table_close(), and try_table_open().

Referenced by standard_ProcessUtility().

◆ finish_heap_swap()

void finish_heap_swap ( Oid  OIDOldHeap,
Oid  OIDNewHeap,
bool  is_system_catalog,
bool  swap_toast_by_content,
bool  check_constraints,
bool  is_internal,
bool  reindex,
TransactionId  frozenXid,
MultiXactId  cutoffMulti,
char  newrelpersistence 
)
extern

Definition at line 1870 of file repack.c.

1879{
1880 ObjectAddress object;
1881 Oid mapped_tables[4];
1882 int i;
1883
1884 /* Report that we are now swapping relation files */
1887
1888 /* Zero out possible results from swapped_relation_files */
1889 memset(mapped_tables, 0, sizeof(mapped_tables));
1890
1891 /*
1892 * Swap the contents of the heap relations (including any toast tables).
1893 * Also set old heap's relfrozenxid to frozenXid.
1894 */
1897 swap_toast_by_content, is_internal,
1899
1900 /*
1901 * If it's a system catalog, queue a sinval message to flush all catcaches
1902 * on the catalog when we reach CommandCounterIncrement.
1903 */
1906
1907 if (reindex)
1908 {
1909 int reindex_flags;
1911
1912 /*
1913 * Rebuild each index on the relation (but not the toast table, which
1914 * is all-new at this point). It is important to do this before the
1915 * DROP step because if we are processing a system catalog that will
1916 * be used during DROP, we want to have its indexes available. There
1917 * is no advantage to the other order anyway because this is all
1918 * transactional, so no chance to reclaim disk space before commit. We
1919 * do not need a final CommandCounterIncrement() because
1920 * reindex_relation does it.
1921 *
1922 * Note: because index_build is called via reindex_relation, it will
1923 * never set indcheckxmin true for the indexes. This is OK even
1924 * though in some sense we are building new indexes rather than
1925 * rebuilding existing ones, because the new heap won't contain any
1926 * HOT chains at all, let alone broken ones, so it can't be necessary
1927 * to set indcheckxmin.
1928 */
1932
1933 /*
1934 * Ensure that the indexes have the same persistence as the parent
1935 * relation.
1936 */
1937 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1939 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1941
1942 /* Report that we are now reindexing relations */
1945
1947 }
1948
1949 /* Report that we are now doing clean up */
1952
1953 /*
1954 * If the relation being rebuilt is pg_class, swap_relation_files()
1955 * couldn't update pg_class's own pg_class entry (check comments in
1956 * swap_relation_files()), thus relfrozenxid was not updated. That's
1957 * annoying because a potential reason for doing a VACUUM FULL is a
1958 * imminent or actual anti-wraparound shutdown. So, now that we can
1959 * access the new relation using its indices, update relfrozenxid.
1960 * pg_class doesn't have a toast relation, so we don't need to update the
1961 * corresponding toast relation. Not that there's little point moving all
1962 * relfrozenxid updates here since swap_relation_files() needs to write to
1963 * pg_class for non-mapped relations anyway.
1964 */
1966 {
1970
1972
1975 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1977
1978 relform->relfrozenxid = frozenXid;
1979 relform->relminmxid = cutoffMulti;
1980
1982
1984 }
1985
1986 /* Destroy new heap with old filenumber */
1987 object.classId = RelationRelationId;
1988 object.objectId = OIDNewHeap;
1989 object.objectSubId = 0;
1990
1991 if (!reindex)
1992 {
1993 /*
1994 * Make sure the changes in pg_class are visible. This is especially
1995 * important if !swap_toast_by_content, so that the correct TOAST
1996 * relation is dropped. (reindex_relation() above did not help in this
1997 * case))
1998 */
2000 }
2001
2002 /*
2003 * The new relation is local to our transaction and we know nothing
2004 * depends on it, so DROP_RESTRICT should be OK.
2005 */
2007
2008 /* performDeletion does CommandCounterIncrement at end */
2009
2010 /*
2011 * Now we must remove any relation mapping entries that we set up for the
2012 * transient table, as well as its toast table and toast index if any. If
2013 * we fail to do this before commit, the relmapper will complain about new
2014 * permanent map entries being added post-bootstrap.
2015 */
2016 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2018
2019 /*
2020 * At this point, everything is kosher except that, if we did toast swap
2021 * by links, the toast table's name corresponds to the transient table.
2022 * The name is irrelevant to the backend because it's referenced by OID,
2023 * but users looking at the catalogs could be confused. Rename it to
2024 * prevent this problem.
2025 *
2026 * Note no lock required on the relation, because we already hold an
2027 * exclusive lock on it.
2028 */
2030 {
2032
2034 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2035 {
2036 Oid toastidx;
2038
2039 /* Get the associated valid index to be renamed */
2040 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2042
2043 /* rename the toast table ... */
2044 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2045 OIDOldHeap);
2046 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2047 NewToastName, true, false);
2048
2049 /* ... and its valid index too. */
2050 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2051 OIDOldHeap);
2052
2054 NewToastName, true, true);
2055
2056 /*
2057 * Reset the relrewrite for the toast. The command-counter
2058 * increment is required here as we are about to update the tuple
2059 * that is updated as part of RenameRelationInternal.
2060 */
2062 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2063 }
2065 }
2066
2067 /* if it's not a catalog table, clear any missing attribute settings */
2068 if (!is_system_catalog)
2069 {
2071
2075 }
2076}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
void RelationClearMissing(Relation rel)
Definition heap.c:1981
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1612
int i
Definition isn.c:77
#define RowExclusiveLock
Definition lockdefs.h:38
@ DROP_RESTRICT
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define snprintf
Definition port.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1488
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4420
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4327
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessExclusiveLock, CacheInvalidateCatalog(), CatalogTupleUpdate(), CommandCounterIncrement(), DROP_RESTRICT, elog, ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, i, NAMEDATALEN, NoLock, ObjectIdGetDatum(), OidIsValid, PERFORM_DELETION_INTERNAL, performDeletion(), pgstat_progress_update_param(), PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP, PROGRESS_REPACK_PHASE_REBUILD_INDEX, PROGRESS_REPACK_PHASE_SWAP_REL_FILES, REINDEX_REL_CHECK_CONSTRAINTS, REINDEX_REL_FORCE_INDEXES_PERMANENT, REINDEX_REL_FORCE_INDEXES_UNLOGGED, REINDEX_REL_SUPPRESS_INDEX_USE, reindex_relation(), relation_close(), RelationClearMissing(), RelationMapRemoveMapping(), RenameRelationInternal(), ResetRelRewrite(), RowExclusiveLock, SearchSysCacheCopy1, snprintf, swap_relation_files(), table_close(), table_open(), and toast_get_valid_index().

Referenced by ATRewriteTables(), rebuild_relation(), rebuild_relation_finish_concurrent(), and refresh_by_heap_swap().

◆ HandleRepackMessageInterrupt()

void HandleRepackMessageInterrupt ( void  )
extern

Definition at line 3600 of file repack.c.

3601{
3602 InterruptPending = true;
3603 RepackMessagePending = true;
3605}
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
volatile sig_atomic_t RepackMessagePending
Definition repack.c:150

References InterruptPending, MyLatch, RepackMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ make_new_heap()

Oid make_new_heap ( Oid  OIDOldHeap,
Oid  NewTableSpace,
Oid  NewAccessMethod,
char  relpersistence,
LOCKMODE  lockmode 
)
extern

Definition at line 1113 of file repack.c.

1115{
1119 Oid toastid;
1121 HeapTuple tuple;
1122 Datum reloptions;
1123 bool isNull;
1125
1126 OldHeap = table_open(OIDOldHeap, lockmode);
1128
1129 /*
1130 * Note that the NewHeap will not receive any of the defaults or
1131 * constraints associated with the OldHeap; we don't need 'em, and there's
1132 * no reason to spend cycles inserting them into the catalogs only to
1133 * delete them.
1134 */
1135
1136 /*
1137 * But we do want to use reloptions of the old heap for new heap.
1138 */
1140 if (!HeapTupleIsValid(tuple))
1141 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1142 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1143 &isNull);
1144 if (isNull)
1145 reloptions = (Datum) 0;
1146
1147 if (relpersistence == RELPERSISTENCE_TEMP)
1149 else
1151
1152 /*
1153 * Create the new heap, using a temporary name in the same namespace as
1154 * the existing table. NOTE: there is some risk of collision with user
1155 * relnames. Working around this seems more trouble than it's worth; in
1156 * particular, we can't create the new heap in a different namespace from
1157 * the old, or we will have problems with the TEMP status of temp tables.
1158 *
1159 * Note: the new heap is not a shared relation, even if we are rebuilding
1160 * a shared rel. However, we do make the new heap mapped if the source is
1161 * mapped. This simplifies swap_relation_files, and is absolutely
1162 * necessary for rebuilding pg_class, for reasons explained there.
1163 */
1164 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1165
1169 InvalidOid,
1170 InvalidOid,
1171 InvalidOid,
1172 OldHeap->rd_rel->relowner,
1175 NIL,
1177 relpersistence,
1178 false,
1181 reloptions,
1182 false,
1183 true,
1184 true,
1185 OIDOldHeap,
1186 NULL);
1188
1189 ReleaseSysCache(tuple);
1190
1191 /*
1192 * Advance command counter so that the newly-created relation's catalog
1193 * tuples will be visible to table_open.
1194 */
1196
1197 /*
1198 * If necessary, create a TOAST table for the new relation.
1199 *
1200 * If the relation doesn't have a TOAST table already, we can't need one
1201 * for the new relation. The other way around is possible though: if some
1202 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1203 * no TOAST table is needed for the new table.
1204 *
1205 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1206 * that the TOAST table will be visible for insertion.
1207 */
1208 toastid = OldHeap->rd_rel->reltoastrelid;
1209 if (OidIsValid(toastid))
1210 {
1211 /* keep the existing toast table's reloptions, if any */
1213 if (!HeapTupleIsValid(tuple))
1214 elog(ERROR, "cache lookup failed for relation %u", toastid);
1215 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1216 &isNull);
1217 if (isNull)
1218 reloptions = (Datum) 0;
1219
1220 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1221
1222 ReleaseSysCache(tuple);
1223 }
1224
1226
1227 return OIDNewHeap;
1228}
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1137
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
#define NIL
Definition pg_list.h:68
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65

References Assert, CommandCounterIncrement(), elog, ERROR, fb(), heap_create_with_catalog(), HeapTupleIsValid, InvalidOid, LookupCreationNamespace(), NAMEDATALEN, NewHeapCreateToastTable(), NIL, NoLock, ObjectIdGetDatum(), OidIsValid, ONCOMMIT_NOOP, RelationGetDescr, RelationGetNamespace, RelationIsMapped, ReleaseSysCache(), SearchSysCache1(), snprintf, SysCacheGetAttr(), table_close(), and table_open().

Referenced by ATRewriteTables(), rebuild_relation(), and RefreshMatViewByOid().

◆ mark_index_clustered()

void mark_index_clustered ( Relation  rel,
Oid  indexOid,
bool  is_internal 
)
extern

Definition at line 818 of file repack.c.

819{
824
825 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
826
827 /*
828 * If the index is already marked clustered, no need to do anything.
829 */
830 if (OidIsValid(indexOid))
831 {
832 if (get_index_isclustered(indexOid))
833 return;
834 }
835
836 /*
837 * Check each index of the relation and set/clear the bit as needed.
838 */
840
841 foreach(index, RelationGetIndexList(rel))
842 {
844
848 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
850
851 /*
852 * Unset the bit if set. We know it's wrong because we checked this
853 * earlier.
854 */
855 if (indexForm->indisclustered)
856 {
857 indexForm->indisclustered = false;
859 }
860 else if (thisIndexOid == indexOid)
861 {
862 /* this was checked earlier, but let's be real sure */
863 if (!indexForm->indisvalid)
864 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
865 indexForm->indisclustered = true;
867 }
868
870 InvalidOid, is_internal);
871
873 }
874
876}
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3879
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
#define lfirst_oid(lc)
Definition pg_list.h:174
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4837

References Assert, CatalogTupleUpdate(), elog, ERROR, fb(), Form_pg_index, get_index_isclustered(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, InvalidOid, InvokeObjectPostAlterHookArg, lfirst_oid, ObjectIdGetDatum(), OidIsValid, RelationData::rd_rel, RelationGetIndexList(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ATExecClusterOn(), ATExecDropCluster(), and rebuild_relation().

◆ ProcessRepackMessages()

void ProcessRepackMessages ( void  )
extern

Definition at line 3611 of file repack.c.

3612{
3613 MemoryContext oldcontext;
3615
3616 /*
3617 * Nothing to do if we haven't launched the worker yet or have already
3618 * terminated it.
3619 */
3620 if (decoding_worker == NULL)
3621 return;
3622
3623 /*
3624 * This is invoked from ProcessInterrupts(), and since some of the
3625 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3626 * for recursive calls if more signals are received while this runs. It's
3627 * unclear that recursive entry would be safe, and it doesn't seem useful
3628 * even if it is safe, so let's block interrupts until done.
3629 */
3631
3632 /*
3633 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3634 * don't want to risk leaking data into long-lived contexts, so let's do
3635 * our work here in a private context that we can reset on each use.
3636 */
3637 if (hpm_context == NULL) /* first time through? */
3639 "ProcessRepackMessages",
3641 else
3643
3644 oldcontext = MemoryContextSwitchTo(hpm_context);
3645
3646 /* OK to process messages. Reset the flag saying there are more to do. */
3647 RepackMessagePending = false;
3648
3649 /*
3650 * Read as many messages as we can from the worker, but stop when no more
3651 * messages can be read from the worker without blocking.
3652 */
3653 while (true)
3654 {
3655 shm_mq_result res;
3656 Size nbytes;
3657 void *data;
3658
3660 &data, true);
3661 if (res == SHM_MQ_WOULD_BLOCK)
3662 break;
3663 else if (res == SHM_MQ_SUCCESS)
3664 {
3665 StringInfoData msg;
3666
3667 initStringInfo(&msg);
3668 appendBinaryStringInfo(&msg, data, nbytes);
3670 pfree(msg.data);
3671 }
3672 else
3673 {
3674 /*
3675 * The decoding worker is special in that it exits as soon as it
3676 * has its work done. Thus the DETACHED result code is fine.
3677 */
3678 Assert(res == SHM_MQ_DETACHED);
3679
3680 break;
3681 }
3682 }
3683
3684 MemoryContextSwitchTo(oldcontext);
3685
3686 /* Might as well clear the context on our way out */
3688
3690}
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const void * data
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3696
static DecodingWorker * decoding_worker
Definition repack.c:144
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mqh
Definition repack.c:140

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), Assert, StringInfoData::data, data, decoding_worker, DecodingWorker::error_mqh, fb(), HOLD_INTERRUPTS, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), pfree(), ProcessRepackMessage(), RepackMessagePending, RESUME_INTERRUPTS, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)
extern

Definition at line 60 of file repack_worker.c.

61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
72 /*
73 * Override the default bgworker_die() with die() so we can use
74 * CHECK_FOR_INTERRUPTS().
75 */
78
80 if (seg == NULL)
83 errmsg("could not map dynamic shared memory segment"));
85
87
88 /* Arrange to signal the leader if we exit. */
90
91 /*
92 * Join locking group - see the comments around the call of
93 * start_repack_decoding_worker().
94 */
95 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 return; /* The leader is not running anymore. */
97
98 /*
99 * Setup a queue to send error messages to the backend that launched this
100 * worker.
101 */
102 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
104 mqh = shm_mq_attach(mq, seg, NULL);
107 shared->backend_proc_number);
108
109 /* Connect to the database. LOGIN is not required. */
112
113 /*
114 * Transaction is needed to open relation, and it also provides us with a
115 * resource owner.
116 */
118
120
121 /*
122 * Not sure the spinlock is needed here - the backend should not change
123 * anything in the shared memory until we have serialized the snapshot.
124 */
125 SpinLockAcquire(&shared->mutex);
127 sfs = &shared->sfs;
128 SpinLockRelease(&shared->mutex);
129
130 SharedFileSetAttach(sfs, seg);
131
132 /*
133 * Prepare to capture the concurrent data changes ourselves.
134 */
136
137 /* Announce that we're ready. */
138 SpinLockAcquire(&shared->mutex);
139 shared->initialized = true;
140 SpinLockRelease(&shared->mutex);
141 ConditionVariableSignal(&shared->cv);
142
143 /* There doesn't seem to a nice API to set these */
145 XactReadOnly = true;
146
147 /* Build the initial snapshot and export it. */
148 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 export_initial_snapshot(snapshot, shared);
150
151 /*
152 * Only historic snapshots should be used now. Do not let us restrict the
153 * progress of xmin horizon.
154 */
156
157 for (;;)
158 {
159 bool stop = decode_concurrent_changes(decoding_ctx, shared);
160
161 if (stop)
162 break;
163
164 }
165
166 /* Cleanup. */
169}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BUFFERALIGN(LEN)
Definition c.h:898
void ConditionVariableSignal(ConditionVariable *cv)
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
#define die(msg)
#define pqsignal
Definition port.h:548
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
#define PointerGetDatum(X)
Definition postgres.h:354
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static dsm_segment * worker_dsm_segment
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2072
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BGWORKER_BYPASS_ROLELOGINCHECK, BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), worker_dsm_segment, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

Variable Documentation

◆ RepackMessagePending

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
extern