PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.h File Reference
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
Include dependency graph for repack.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ClusterParams
 

Macros

#define CLUOPT_VERBOSE   0x01 /* print progress info */
 
#define CLUOPT_RECHECK   0x02 /* recheck relation state */
 
#define CLUOPT_RECHECK_ISCLUSTERED
 
#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */
 
#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */
 

Typedefs

typedef struct ClusterParams ClusterParams
 

Functions

void ExecRepack (ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 
void cluster_rel (RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
 
void check_index_is_clusterable (Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
 
void mark_index_clustered (Relation rel, Oid indexOid, bool is_internal)
 
Oid make_new_heap (Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
 
void finish_heap_swap (Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
 
void HandleRepackMessageInterrupt (void)
 
void ProcessRepackMessages (void)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 

Variables

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
 

Macro Definition Documentation

◆ CLUOPT_ANALYZE

#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */

Definition at line 28 of file repack.h.

◆ CLUOPT_CONCURRENT

#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */

Definition at line 29 of file repack.h.

◆ CLUOPT_RECHECK

#define CLUOPT_RECHECK   0x02 /* recheck relation state */

Definition at line 26 of file repack.h.

◆ CLUOPT_RECHECK_ISCLUSTERED

#define CLUOPT_RECHECK_ISCLUSTERED
Value:
0x04 /* recheck relation state for
* indisclustered */

Definition at line 27 of file repack.h.

34{
35 uint32 options; /* bitmask of CLUOPT_* */
37
39
40
41extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
42
43extern void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
44 ClusterParams *params, bool isTopLevel);
46 LOCKMODE lockmode);
47extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
48
50 char relpersistence, LOCKMODE lockmode);
55 bool is_internal,
56 bool reindex,
59 char newrelpersistence);
60
61extern void HandleRepackMessageInterrupt(void);
62extern void ProcessRepackMessages(void);
63
64/* in repack_worker.c */
66extern bool AmRepackWorker(void);
67
68#endif /* REPACK_H */
#define PGDLLIMPORT
Definition c.h:1433
TransactionId MultiXactId
Definition c.h:746
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define stmt
int LOCKMODE
Definition lockdefs.h:26
RepackCommand
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
bool AmRepackWorker(void)
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1911
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:769
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:247
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:521
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1154
void HandleRepackMessageInterrupt(void)
Definition repack.c:3655
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:829
void RepackWorkerMain(Datum main_arg)
PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
Definition repack.c:152
void ProcessRepackMessages(void)
Definition repack.c:3666

◆ CLUOPT_VERBOSE

#define CLUOPT_VERBOSE   0x01 /* print progress info */

Definition at line 25 of file repack.h.

Typedef Documentation

◆ ClusterParams

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )
extern

Definition at line 182 of file repack_worker.c.

183{
184 return am_repack_worker;
185}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ check_index_is_clusterable()

void check_index_is_clusterable ( Relation  OldHeap,
Oid  indexOid,
LOCKMODE  lockmode 
)
extern

Definition at line 769 of file repack.c.

770{
772
773 OldIndex = index_open(indexOid, lockmode);
774
775 /*
776 * Check that index is in fact an index on the given relation
777 */
778 if (OldIndex->rd_index == NULL ||
779 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
782 errmsg("\"%s\" is not an index for table \"%s\"",
785
786 /* Index AM must allow clustering */
787 if (!OldIndex->rd_indam->amclusterable)
790 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
792
793 /*
794 * Disallow clustering on incomplete indexes (those that might not index
795 * every row of the relation). We could relax this by making a separate
796 * seqscan pass over the table to copy the missing rows, but that seems
797 * expensive and tedious.
798 */
799 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
802 errmsg("cannot cluster on partial index \"%s\"",
804
805 /*
806 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
807 * it might well not contain entries for every heap row, or might not even
808 * be internally consistent. (But note that we don't check indcheckxmin;
809 * the worst consequence of following broken HOT chains would be that we
810 * might put recently-dead tuples out-of-order in the new table, and there
811 * is little harm in that.)
812 */
813 if (!OldIndex->rd_index->indisvalid)
816 errmsg("cannot cluster on invalid index \"%s\"",
818
819 /* Drop relcache refcnt on OldIndex, but keep lock */
821}
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
#define NoLock
Definition lockdefs.h:34
static char * errmsg
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550

References ereport, errcode(), errmsg, ERROR, fb(), heap_attisnull(), index_close(), index_open(), NoLock, RelationGetRelationName, and RelationGetRelid.

Referenced by ATExecClusterOn(), cluster_rel(), ExecRepack(), and process_single_relation().

◆ cluster_rel()

void cluster_rel ( RepackCommand  cmd,
Relation  OldHeap,
Oid  indexOid,
ClusterParams params,
bool  isTopLevel 
)
extern

Definition at line 521 of file repack.c.

523{
524 Oid tableOid = RelationGetRelid(OldHeap);
527 Oid save_userid;
528 int save_sec_context;
529 int save_nestlevel;
530 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
531 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
532 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
534
535 /* Determine the lock mode to use. */
536 lmode = RepackLockLevel(concurrent);
537
538 /*
539 * Check some preconditions in the concurrent case. This also obtains the
540 * replica index OID.
541 */
542 if (concurrent)
544
545 /* Check for user-requested abort. */
547
550
551 /*
552 * Switch to the table owner's userid, so that any index functions are run
553 * as that user. Also lock down security-restricted operations and
554 * arrange to make GUC variable changes local to this command.
555 */
556 GetUserIdAndSecContext(&save_userid, &save_sec_context);
557 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
558 save_sec_context | SECURITY_RESTRICTED_OPERATION);
559 save_nestlevel = NewGUCNestLevel();
561
562 /*
563 * Recheck that the relation is still what it was when we started.
564 *
565 * Note that it's critical to skip this in single-relation CLUSTER;
566 * otherwise, we would reject an attempt to cluster using a
567 * not-previously-clustered index.
568 */
569 if (recheck &&
570 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
571 lmode, params->options))
572 goto out;
573
574 /*
575 * We allow repacking shared catalogs only when not using an index. It
576 * would work to use an index in most respects, but the index would only
577 * get marked as indisclustered in the current database, leading to
578 * unexpected behavior if CLUSTER were later invoked in another database.
579 */
580 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
583 /*- translator: first %s is name of a SQL command, eg. REPACK */
584 errmsg("cannot execute %s on a shared catalog",
586
587 /*
588 * The CONCURRENTLY case should have been rejected earlier because it does
589 * not support system catalogs.
590 */
591 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
592
593 /*
594 * Don't process temp tables of other backends ... their local buffer
595 * manager is not going to cope.
596 */
600 /*- translator: first %s is name of a SQL command, eg. REPACK */
601 errmsg("cannot execute %s on temporary tables of other sessions",
603
604 /*
605 * Also check for active uses of the relation in the current transaction,
606 * including open scans and pending AFTER trigger events.
607 */
609
610 /* Check heap and index are valid to cluster on */
611 if (OidIsValid(indexOid))
612 {
613 /* verify the index is good and lock it */
615 /* also open it */
616 index = index_open(indexOid, NoLock);
617 }
618 else
619 index = NULL;
620
621 /*
622 * When allow_system_table_mods is turned off, we disallow repacking a
623 * catalog on a particular index unless that's already the clustered index
624 * for that catalog.
625 *
626 * XXX We don't check for this in CLUSTER, because it's historically been
627 * allowed.
628 */
629 if (cmd != REPACK_COMMAND_CLUSTER &&
630 !allowSystemTableMods && OidIsValid(indexOid) &&
631 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
634 errmsg("permission denied: \"%s\" is a system catalog",
636 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
637 "allow_system_table_mods"));
638
639 /*
640 * Quietly ignore the request if this is a materialized view which has not
641 * been populated from its query. No harm is done because there is no data
642 * to deal with, and we don't want to throw an error if this is part of a
643 * multi-relation request -- for example, CLUSTER was run on the entire
644 * database.
645 */
646 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
648 {
649 if (index)
652 goto out;
653 }
654
655 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
656 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
657 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
658
659 /*
660 * All predicate locks on the tuples or pages are about to be made
661 * invalid, because we move tuples around. Promote them to relation
662 * locks. Predicate locks on indexes will be promoted when they are
663 * reindexed.
664 *
665 * During concurrent processing, the heap as well as its indexes stay in
666 * operation, so we postpone this step until they are locked using
667 * AccessExclusiveLock near the end of the processing.
668 */
669 if (!concurrent)
671
672 /*
673 * rebuild_relation does all the dirty work, and closes OldHeap and index,
674 * if valid.
675 *
676 * In concurrent mode, make sure the worker terminates; normally it does
677 * so by itself, but a PG_ENSURE_ERROR_CLEANUP callback ensures that this
678 * happens even in case this backend dies early on a FATAL exit. Normal
679 * mode doesn't need that overhead.
680 */
681 if (concurrent)
682 {
684 {
686 }
689 }
690 else
692
693out:
694 /* Roll back any GUC changes executed by index functions */
695 AtEOXact_GUC(false, save_nestlevel);
696
697 /* Restore userid and security context */
698 SetUserIdAndSecContext(save_userid, save_sec_context);
699
701}
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
int errdetail(const char *fmt,...) pg_attribute_printf(1
bool allowSystemTableMods
Definition globals.c:132
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:52
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:322
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
@ REPACK_COMMAND_CLUSTER
static int verbose
#define InvalidOid
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3052
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define RelationIsPopulated(relation)
Definition rel.h:697
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:678
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:708
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:895
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:769
static void stop_repack_decoding_worker(void)
Definition repack.c:3530
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:488
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2510
static void stop_repack_decoding_worker_cb(int code, Datum arg)
Definition repack.c:3575
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:1007
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK
Definition repack.h:26
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
uint32 options
Definition repack.h:34
Definition type.h:97
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4473

References allowSystemTableMods, Assert, AtEOXact_GUC(), check_concurrent_repack_requirements(), CHECK_FOR_INTERRUPTS, check_index_is_clusterable(), CheckTableNotInUse(), CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_VERBOSE, cluster_rel_recheck(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserIdAndSecContext(), index_close(), index_open(), InvalidOid, IsCatalogRelation(), NewGUCNestLevel(), NoLock, OidIsValid, ClusterParams::options, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, pgstat_progress_end_command(), pgstat_progress_start_command(), pgstat_progress_update_param(), PROGRESS_COMMAND_REPACK, PROGRESS_REPACK_COMMAND, rebuild_relation(), relation_close(), RELATION_IS_OTHER_TEMP, RelationGetRelationName, RelationGetRelid, RelationIsPopulated, REPACK_COMMAND_CLUSTER, RepackCommandAsString(), RepackLockLevel(), RestrictSearchPath(), SECURITY_RESTRICTED_OPERATION, SetUserIdAndSecContext(), stop_repack_decoding_worker(), stop_repack_decoding_worker_cb(), TransferPredicateLocksToHeapRelation(), and verbose.

Referenced by ExecRepack(), process_single_relation(), and vacuum_rel().

◆ ExecRepack()

void ExecRepack ( ParseState pstate,
RepackStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 247 of file repack.c.

248{
249 ClusterParams params = {0};
250 Relation rel = NULL;
252 LOCKMODE lockmode;
253 List *rtcs;
254 bool verbose = false;
255 bool analyze = false;
256 bool concurrently = false;
257
258 /* Parse option list */
259 foreach_node(DefElem, opt, stmt->params)
260 {
261 if (strcmp(opt->defname, "verbose") == 0)
262 verbose = defGetBoolean(opt);
263 else if (strcmp(opt->defname, "analyze") == 0 ||
264 strcmp(opt->defname, "analyse") == 0)
265 analyze = defGetBoolean(opt);
266 else if (strcmp(opt->defname, "concurrently") == 0)
267 {
268 if (stmt->command != REPACK_COMMAND_REPACK)
271 errmsg("CONCURRENTLY option not supported for %s",
272 RepackCommandAsString(stmt->command)));
274 }
275 else
278 errmsg("unrecognized %s option \"%s\"",
279 RepackCommandAsString(stmt->command),
280 opt->defname),
281 parser_errposition(pstate, opt->location));
282 }
283
284 params.options |=
285 (verbose ? CLUOPT_VERBOSE : 0) |
286 (analyze ? CLUOPT_ANALYZE : 0) |
288
289 /* Determine the lock mode to use. */
290 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
291
292 if ((params.options & CLUOPT_CONCURRENT) != 0)
293 {
294 /*
295 * Make sure we're not in a transaction block.
296 *
297 * The reason is that repack_setup_logical_decoding() could wait
298 * indefinitely for our XID to complete. (The deadlock detector would
299 * not recognize it because we'd be waiting for ourselves, i.e. no
300 * real lock conflict.) It would be possible to run in a transaction
301 * block if we had no XID, but this restriction is simpler for users
302 * to understand and we don't lose any functionality.
303 */
304 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
305 }
306
307 /*
308 * If a single relation is specified, process it and we're done ... unless
309 * the relation is a partitioned table, in which case we fall through.
310 */
311 if (stmt->relation != NULL)
312 {
313 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
314 if (rel == NULL)
315 return; /* all done */
316 }
317
318 /*
319 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
320 * can add support for this later.
321 */
322 if (params.options & CLUOPT_ANALYZE)
325 errmsg("cannot execute %s on multiple tables",
326 "REPACK (ANALYZE)"));
327
328 /*
329 * By here, we know we are in a multi-table situation.
330 *
331 * Concurrent processing is currently considered rather special (e.g. in
332 * terms of resources consumed) so it is not performed in bulk.
333 */
334 if (params.options & CLUOPT_CONCURRENT)
335 {
336 if (rel != NULL)
337 {
338 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
341 errmsg("%s is not supported for partitioned tables",
342 "REPACK (CONCURRENTLY)"),
343 errhint("Consider running the command on individual partitions."));
344 }
345 else
348 errmsg("%s requires an explicit table name",
349 "REPACK (CONCURRENTLY)"));
350 }
351
352 /*
353 * In order to avoid holding locks for too long, we want to process each
354 * table in its own transaction. This forces us to disallow running
355 * inside a user transaction block.
356 */
358
359 /* Also, we need a memory context to hold our list of relations */
361 "Repack",
363
364 /*
365 * Since we open a new transaction for each relation, we have to check
366 * that the relation still is what we think it is.
367 *
368 * In single-transaction CLUSTER, we don't need the overhead.
369 */
370 params.options |= CLUOPT_RECHECK;
371
372 /*
373 * If we don't have a relation yet, determine a relation list. If we do,
374 * then it must be a partitioned table, and we want to process its
375 * partitions.
376 */
377 if (rel == NULL)
378 {
379 Assert(stmt->indexname == NULL);
380 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
383 }
384 else
385 {
386 Oid relid;
387 bool rel_is_index;
388
389 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
390
391 /*
392 * If USING INDEX was specified, resolve the index name now and pass
393 * it down.
394 */
395 if (stmt->usingindex)
396 {
397 /*
398 * If no index name was specified when repacking a partitioned
399 * table, punt for now. Maybe we can improve this later.
400 */
401 if (!stmt->indexname)
402 {
403 if (stmt->command == REPACK_COMMAND_CLUSTER)
406 errmsg("there is no previously clustered index for table \"%s\"",
408 else
411 /*- translator: first %s is name of a SQL command, eg. REPACK */
412 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
413 RepackCommandAsString(stmt->command),
415 }
416
417 relid = determine_clustered_index(rel, stmt->usingindex,
418 stmt->indexname);
419 if (!OidIsValid(relid))
420 elog(ERROR, "unable to determine index to cluster on");
422
423 rel_is_index = true;
424 }
425 else
426 {
427 relid = RelationGetRelid(rel);
428 rel_is_index = false;
429 }
430
432 relid, rel_is_index,
434
435 /* close parent relation, releasing lock on it */
437 rel = NULL;
438 }
439
440 /* Commit to get out of starting transaction */
443
444 /* Cluster the tables, each in a separate transaction */
445 Assert(rel == NULL);
447 {
448 /* Start a new transaction for each relation. */
450
451 /*
452 * Open the target table, coping with the case where it has been
453 * dropped.
454 */
455 rel = try_table_open(rtc->tableOid, lockmode);
456 if (rel == NULL)
457 {
459 continue;
460 }
461
462 /* functions in indexes may want a snapshot set */
464
465 /* Process this table */
466 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
467 /* cluster_rel closes the relation, but keeps lock */
468
471 }
472
473 /* Start a new transaction for the cleanup work. */
475
476 /* Clean up working storage */
478}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errhint(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:228
#define AccessExclusiveLock
Definition lockdefs.h:43
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
MemoryContext PortalContext
Definition mcxt.c:176
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
@ REPACK_COMMAND_REPACK
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
static long analyze(struct nfa *nfa)
Definition regc_nfa.c:3051
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2285
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2376
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:521
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2132
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2467
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207

References AccessExclusiveLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, analyze(), Assert, check_index_is_clusterable(), CLUOPT_ANALYZE, CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_RECHECK_ISCLUSTERED, CLUOPT_VERBOSE, cluster_rel(), CommitTransactionCommand(), defGetBoolean(), determine_clustered_index(), elog, ereport, errcode(), errhint(), errmsg, ERROR, fb(), foreach_node, foreach_ptr, get_tables_to_repack(), get_tables_to_repack_partitioned(), GetTransactionSnapshot(), MemoryContextDelete(), OidIsValid, ClusterParams::options, parser_errposition(), PopActiveSnapshot(), PortalContext, PreventInTransactionBlock(), process_single_relation(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, REPACK_COMMAND_CLUSTER, REPACK_COMMAND_REPACK, RepackCommandAsString(), RepackLockLevel(), StartTransactionCommand(), stmt, table_close(), try_table_open(), and verbose.

Referenced by standard_ProcessUtility().

◆ finish_heap_swap()

void finish_heap_swap ( Oid  OIDOldHeap,
Oid  OIDNewHeap,
bool  is_system_catalog,
bool  swap_toast_by_content,
bool  check_constraints,
bool  is_internal,
bool  reindex,
TransactionId  frozenXid,
MultiXactId  cutoffMulti,
char  newrelpersistence 
)
extern

Definition at line 1911 of file repack.c.

1920{
1921 ObjectAddress object;
1922 Oid mapped_tables[4];
1923 int i;
1924
1925 /* Report that we are now swapping relation files */
1928
1929 /* Zero out possible results from swapped_relation_files */
1930 memset(mapped_tables, 0, sizeof(mapped_tables));
1931
1932 /*
1933 * Swap the contents of the heap relations (including any toast tables).
1934 * Also set old heap's relfrozenxid to frozenXid.
1935 */
1938 swap_toast_by_content, is_internal,
1940
1941 /*
1942 * If it's a system catalog, queue a sinval message to flush all catcaches
1943 * on the catalog when we reach CommandCounterIncrement.
1944 */
1947
1948 if (reindex)
1949 {
1950 int reindex_flags;
1952
1953 /*
1954 * Rebuild each index on the relation (but not the toast table, which
1955 * is all-new at this point). It is important to do this before the
1956 * DROP step because if we are processing a system catalog that will
1957 * be used during DROP, we want to have its indexes available. There
1958 * is no advantage to the other order anyway because this is all
1959 * transactional, so no chance to reclaim disk space before commit. We
1960 * do not need a final CommandCounterIncrement() because
1961 * reindex_relation does it.
1962 *
1963 * Note: because index_build is called via reindex_relation, it will
1964 * never set indcheckxmin true for the indexes. This is OK even
1965 * though in some sense we are building new indexes rather than
1966 * rebuilding existing ones, because the new heap won't contain any
1967 * HOT chains at all, let alone broken ones, so it can't be necessary
1968 * to set indcheckxmin.
1969 */
1973
1974 /*
1975 * Ensure that the indexes have the same persistence as the parent
1976 * relation.
1977 */
1978 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1980 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1982
1983 /* Report that we are now reindexing relations */
1986
1988 }
1989
1990 /* Report that we are now doing clean up */
1993
1994 /*
1995 * If the relation being rebuilt is pg_class, swap_relation_files()
1996 * couldn't update pg_class's own pg_class entry (check comments in
1997 * swap_relation_files()), thus relfrozenxid was not updated. That's
1998 * annoying because a potential reason for doing a VACUUM FULL is a
1999 * imminent or actual anti-wraparound shutdown. So, now that we can
2000 * access the new relation using its indices, update relfrozenxid.
2001 * pg_class doesn't have a toast relation, so we don't need to update the
2002 * corresponding toast relation. Not that there's little point moving all
2003 * relfrozenxid updates here since swap_relation_files() needs to write to
2004 * pg_class for non-mapped relations anyway.
2005 */
2007 {
2011
2013
2016 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
2018
2019 relform->relfrozenxid = frozenXid;
2020 relform->relminmxid = cutoffMulti;
2021
2023
2025 }
2026
2027 /* Destroy new heap with old filenumber */
2028 object.classId = RelationRelationId;
2029 object.objectId = OIDNewHeap;
2030 object.objectSubId = 0;
2031
2032 if (!reindex)
2033 {
2034 /*
2035 * Make sure the changes in pg_class are visible. This is especially
2036 * important if !swap_toast_by_content, so that the correct TOAST
2037 * relation is dropped. (reindex_relation() above did not help in this
2038 * case))
2039 */
2041 }
2042
2043 /*
2044 * The new relation is local to our transaction and we know nothing
2045 * depends on it, so DROP_RESTRICT should be OK.
2046 */
2048
2049 /* performDeletion does CommandCounterIncrement at end */
2050
2051 /*
2052 * Now we must remove any relation mapping entries that we set up for the
2053 * transient table, as well as its toast table and toast index if any. If
2054 * we fail to do this before commit, the relmapper will complain about new
2055 * permanent map entries being added post-bootstrap.
2056 */
2057 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2059
2060 /*
2061 * At this point, everything is kosher except that, if we did toast swap
2062 * by links, the toast table's name corresponds to the transient table.
2063 * The name is irrelevant to the backend because it's referenced by OID,
2064 * but users looking at the catalogs could be confused. Rename it to
2065 * prevent this problem.
2066 *
2067 * Note no lock required on the relation, because we already hold an
2068 * exclusive lock on it.
2069 */
2071 {
2073
2075 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2076 {
2077 Oid toastidx;
2079
2080 /* Get the associated valid index to be renamed */
2081 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2083
2084 /* rename the toast table ... */
2085 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2086 OIDOldHeap);
2087 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2088 NewToastName, true, false);
2089
2090 /* ... and its valid index too. */
2091 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2092 OIDOldHeap);
2093
2095 NewToastName, true, true);
2096
2097 /*
2098 * Reset the relrewrite for the toast. The command-counter
2099 * increment is required here as we are about to update the tuple
2100 * that is updated as part of RenameRelationInternal.
2101 */
2103 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2104 }
2106 }
2107
2108 /* if it's not a catalog table, clear any missing attribute settings */
2109 if (!is_system_catalog)
2110 {
2112
2116 }
2117}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
void RelationClearMissing(Relation rel)
Definition heap.c:1984
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1609
int i
Definition isn.c:77
#define RowExclusiveLock
Definition lockdefs.h:38
@ DROP_RESTRICT
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define snprintf
Definition port.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1529
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4420
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4327
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessExclusiveLock, CacheInvalidateCatalog(), CatalogTupleUpdate(), CommandCounterIncrement(), DROP_RESTRICT, elog, ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, i, NAMEDATALEN, NoLock, ObjectIdGetDatum(), OidIsValid, PERFORM_DELETION_INTERNAL, performDeletion(), pgstat_progress_update_param(), PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP, PROGRESS_REPACK_PHASE_REBUILD_INDEX, PROGRESS_REPACK_PHASE_SWAP_REL_FILES, REINDEX_REL_CHECK_CONSTRAINTS, REINDEX_REL_FORCE_INDEXES_PERMANENT, REINDEX_REL_FORCE_INDEXES_UNLOGGED, REINDEX_REL_SUPPRESS_INDEX_USE, reindex_relation(), relation_close(), RelationClearMissing(), RelationMapRemoveMapping(), RenameRelationInternal(), ResetRelRewrite(), RowExclusiveLock, SearchSysCacheCopy1, snprintf, swap_relation_files(), table_close(), table_open(), and toast_get_valid_index().

Referenced by ATRewriteTables(), rebuild_relation(), rebuild_relation_finish_concurrent(), and refresh_by_heap_swap().

◆ HandleRepackMessageInterrupt()

void HandleRepackMessageInterrupt ( void  )
extern

Definition at line 3655 of file repack.c.

3656{
3657 InterruptPending = true;
3658 RepackMessagePending = true;
3660}
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
volatile sig_atomic_t RepackMessagePending
Definition repack.c:152

References InterruptPending, MyLatch, RepackMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ make_new_heap()

Oid make_new_heap ( Oid  OIDOldHeap,
Oid  NewTableSpace,
Oid  NewAccessMethod,
char  relpersistence,
LOCKMODE  lockmode 
)
extern

Definition at line 1154 of file repack.c.

1156{
1160 Oid toastid;
1162 HeapTuple tuple;
1163 Datum reloptions;
1164 bool isNull;
1166
1167 OldHeap = table_open(OIDOldHeap, lockmode);
1169
1170 /*
1171 * Note that the NewHeap will not receive any of the defaults or
1172 * constraints associated with the OldHeap; we don't need 'em, and there's
1173 * no reason to spend cycles inserting them into the catalogs only to
1174 * delete them.
1175 */
1176
1177 /*
1178 * But we do want to use reloptions of the old heap for new heap.
1179 */
1181 if (!HeapTupleIsValid(tuple))
1182 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1183 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1184 &isNull);
1185 if (isNull)
1186 reloptions = (Datum) 0;
1187
1188 if (relpersistence == RELPERSISTENCE_TEMP)
1190 else
1192
1193 /*
1194 * Create the new heap, using a temporary name in the same namespace as
1195 * the existing table. NOTE: there is some risk of collision with user
1196 * relnames. Working around this seems more trouble than it's worth; in
1197 * particular, we can't create the new heap in a different namespace from
1198 * the old, or we will have problems with the TEMP status of temp tables.
1199 *
1200 * Note: the new heap is not a shared relation, even if we are rebuilding
1201 * a shared rel. However, we do make the new heap mapped if the source is
1202 * mapped. This simplifies swap_relation_files, and is absolutely
1203 * necessary for rebuilding pg_class, for reasons explained there.
1204 */
1205 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1206
1210 InvalidOid,
1211 InvalidOid,
1212 InvalidOid,
1213 OldHeap->rd_rel->relowner,
1216 NIL,
1218 relpersistence,
1219 false,
1222 reloptions,
1223 false,
1224 true,
1225 true,
1226 OIDOldHeap,
1227 NULL);
1229
1230 ReleaseSysCache(tuple);
1231
1232 /*
1233 * Advance command counter so that the newly-created relation's catalog
1234 * tuples will be visible to table_open.
1235 */
1237
1238 /*
1239 * If necessary, create a TOAST table for the new relation.
1240 *
1241 * If the relation doesn't have a TOAST table already, we can't need one
1242 * for the new relation. The other way around is possible though: if some
1243 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1244 * no TOAST table is needed for the new table.
1245 *
1246 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1247 * that the TOAST table will be visible for insertion.
1248 */
1249 toastid = OldHeap->rd_rel->reltoastrelid;
1250 if (OidIsValid(toastid))
1251 {
1252 /* keep the existing toast table's reloptions, if any */
1254 if (!HeapTupleIsValid(tuple))
1255 elog(ERROR, "cache lookup failed for relation %u", toastid);
1256 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1257 &isNull);
1258 if (isNull)
1259 reloptions = (Datum) 0;
1260
1261 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1262
1263 ReleaseSysCache(tuple);
1264 }
1265
1267
1268 return OIDNewHeap;
1269}
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1140
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
#define NIL
Definition pg_list.h:68
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65

References Assert, CommandCounterIncrement(), elog, ERROR, fb(), heap_create_with_catalog(), HeapTupleIsValid, InvalidOid, LookupCreationNamespace(), NAMEDATALEN, NewHeapCreateToastTable(), NIL, NoLock, ObjectIdGetDatum(), OidIsValid, ONCOMMIT_NOOP, RelationGetDescr, RelationGetNamespace, RelationIsMapped, ReleaseSysCache(), SearchSysCache1(), snprintf, SysCacheGetAttr(), table_close(), and table_open().

Referenced by ATRewriteTables(), rebuild_relation(), and RefreshMatViewByOid().

◆ mark_index_clustered()

void mark_index_clustered ( Relation  rel,
Oid  indexOid,
bool  is_internal 
)
extern

Definition at line 829 of file repack.c.

830{
835
836 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
837
838 /*
839 * If the index is already marked clustered, no need to do anything.
840 */
841 if (OidIsValid(indexOid))
842 {
843 if (get_index_isclustered(indexOid))
844 return;
845 }
846
847 /*
848 * Check each index of the relation and set/clear the bit as needed.
849 */
851
852 foreach(index, RelationGetIndexList(rel))
853 {
855
859 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
861
862 /*
863 * Unset the bit if set. We know it's wrong because we checked this
864 * earlier.
865 */
866 if (indexForm->indisclustered)
867 {
868 indexForm->indisclustered = false;
870 }
871 else if (thisIndexOid == indexOid)
872 {
873 /* this was checked earlier, but let's be real sure */
874 if (!indexForm->indisvalid)
875 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
876 indexForm->indisclustered = true;
878 }
879
881 InvalidOid, is_internal);
882
884 }
885
887}
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3954
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
#define lfirst_oid(lc)
Definition pg_list.h:174
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4837

References Assert, CatalogTupleUpdate(), elog, ERROR, fb(), Form_pg_index, get_index_isclustered(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, InvalidOid, InvokeObjectPostAlterHookArg, lfirst_oid, ObjectIdGetDatum(), OidIsValid, RelationData::rd_rel, RelationGetIndexList(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ATExecClusterOn(), ATExecDropCluster(), and rebuild_relation().

◆ ProcessRepackMessages()

void ProcessRepackMessages ( void  )
extern

Definition at line 3666 of file repack.c.

3667{
3668 MemoryContext oldcontext;
3670
3671 /*
3672 * Nothing to do if we haven't launched the worker yet or have already
3673 * terminated it.
3674 */
3675 if (decoding_worker == NULL)
3676 return;
3677
3678 /*
3679 * This is invoked from ProcessInterrupts(), and since some of the
3680 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3681 * for recursive calls if more signals are received while this runs. It's
3682 * unclear that recursive entry would be safe, and it doesn't seem useful
3683 * even if it is safe, so let's block interrupts until done.
3684 */
3686
3687 /*
3688 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3689 * don't want to risk leaking data into long-lived contexts, so let's do
3690 * our work here in a private context that we can reset on each use.
3691 */
3692 if (hpm_context == NULL) /* first time through? */
3694 "ProcessRepackMessages",
3696 else
3698
3699 oldcontext = MemoryContextSwitchTo(hpm_context);
3700
3701 /* OK to process messages. Reset the flag saying there are more to do. */
3702 RepackMessagePending = false;
3703
3704 /*
3705 * Read as many messages as we can from the worker, but stop when no more
3706 * messages can be read from the worker without blocking.
3707 */
3708 while (true)
3709 {
3710 shm_mq_result res;
3711 Size nbytes;
3712 void *data;
3713
3715 &data, true);
3716 if (res == SHM_MQ_WOULD_BLOCK)
3717 break;
3718 else if (res == SHM_MQ_SUCCESS)
3719 {
3720 StringInfoData msg;
3721
3722 initStringInfo(&msg);
3723 appendBinaryStringInfo(&msg, data, nbytes);
3725 pfree(msg.data);
3726 }
3727 else
3728 {
3729 /*
3730 * The decoding worker is special in that it exits as soon as it
3731 * has its work done. Thus the DETACHED result code is fine.
3732 */
3733 Assert(res == SHM_MQ_DETACHED);
3734
3735 break;
3736 }
3737 }
3738
3739 MemoryContextSwitchTo(oldcontext);
3740
3741 /* Might as well clear the context on our way out */
3743
3745}
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const void * data
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3751
static DecodingWorker * decoding_worker
Definition repack.c:146
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mqh
Definition repack.c:142

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), Assert, StringInfoData::data, data, decoding_worker, DecodingWorker::error_mqh, fb(), HOLD_INTERRUPTS, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), pfree(), ProcessRepackMessage(), RepackMessagePending, RESUME_INTERRUPTS, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)
extern

Definition at line 60 of file repack_worker.c.

61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
73
75 if (seg == NULL)
78 errmsg("could not map dynamic shared memory segment"));
80
82
83 /* Arrange to signal the leader if we exit. */
85
86 /*
87 * Join locking group - see the comments around the call of
88 * start_repack_decoding_worker().
89 */
90 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
91 return; /* The leader is not running anymore. */
92
93 /*
94 * Setup a queue to send error messages to the backend that launched this
95 * worker.
96 */
97 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
99 mqh = shm_mq_attach(mq, seg, NULL);
102 shared->backend_proc_number);
103
104 /* Connect to the database. LOGIN is not required. */
107
108 /*
109 * Transaction is needed to open relation, and it also provides us with a
110 * resource owner.
111 */
113
115
116 /*
117 * Not sure the spinlock is needed here - the backend should not change
118 * anything in the shared memory until we have serialized the snapshot.
119 */
120 SpinLockAcquire(&shared->mutex);
122 sfs = &shared->sfs;
123 SpinLockRelease(&shared->mutex);
124
125 SharedFileSetAttach(sfs, seg);
126
127 /*
128 * Prepare to capture the concurrent data changes ourselves.
129 */
131
132 /* Announce that we're ready. */
133 SpinLockAcquire(&shared->mutex);
134 shared->initialized = true;
135 SpinLockRelease(&shared->mutex);
136 ConditionVariableSignal(&shared->cv);
137
138 /* There doesn't seem to a nice API to set these */
140 XactReadOnly = true;
141
142 /* Build the initial snapshot and export it. */
143 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
144 export_initial_snapshot(snapshot, shared);
145
146 /*
147 * Only historic snapshots should be used now. Do not let us restrict the
148 * progress of xmin horizon.
149 */
151
152 for (;;)
153 {
154 bool stop = decode_concurrent_changes(decoding_ctx, shared);
155
156 if (stop)
157 break;
158
159 }
160
161 /* Cleanup. */
164}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BUFFERALIGN(LEN)
Definition c.h:898
void ConditionVariableSignal(ConditionVariable *cv)
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
#define PointerGetDatum(X)
Definition postgres.h:354
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static dsm_segment * worker_dsm_segment
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:444
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2105
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BGWORKER_BYPASS_ROLELOGINCHECK, BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), worker_dsm_segment, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

Variable Documentation

◆ RepackMessagePending

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
extern