PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.h File Reference
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
Include dependency graph for repack.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ClusterParams
 

Macros

#define CLUOPT_VERBOSE   0x01 /* print progress info */
 
#define CLUOPT_RECHECK   0x02 /* recheck relation state */
 
#define CLUOPT_RECHECK_ISCLUSTERED
 
#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */
 
#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */
 

Typedefs

typedef struct ClusterParams ClusterParams
 

Functions

void ExecRepack (ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 
void cluster_rel (RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
 
void check_index_is_clusterable (Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
 
void mark_index_clustered (Relation rel, Oid indexOid, bool is_internal)
 
Oid make_new_heap (Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
 
void finish_heap_swap (Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
 
void HandleRepackMessageInterrupt (void)
 
void ProcessRepackMessages (void)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 

Variables

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
 

Macro Definition Documentation

◆ CLUOPT_ANALYZE

#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */

Definition at line 28 of file repack.h.

◆ CLUOPT_CONCURRENT

#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */

Definition at line 29 of file repack.h.

◆ CLUOPT_RECHECK

#define CLUOPT_RECHECK   0x02 /* recheck relation state */

Definition at line 26 of file repack.h.

◆ CLUOPT_RECHECK_ISCLUSTERED

#define CLUOPT_RECHECK_ISCLUSTERED
Value:
0x04 /* recheck relation state for
* indisclustered */

Definition at line 27 of file repack.h.

34{
35 uint32 options; /* bitmask of CLUOPT_* */
37
39
40
41extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
42
43extern void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
44 ClusterParams *params, bool isTopLevel);
46 LOCKMODE lockmode);
47extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
48
50 char relpersistence, LOCKMODE lockmode);
55 bool is_internal,
56 bool reindex,
59 char newrelpersistence);
60
61extern void HandleRepackMessageInterrupt(void);
62extern void ProcessRepackMessages(void);
63
64/* in repack_worker.c */
66extern bool AmRepackWorker(void);
67
68#endif /* REPACK_H */
#define PGDLLIMPORT
Definition c.h:1421
TransactionId MultiXactId
Definition c.h:746
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define stmt
int LOCKMODE
Definition lockdefs.h:26
RepackCommand
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
bool AmRepackWorker(void)
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1864
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:752
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:238
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:503
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1107
void HandleRepackMessageInterrupt(void)
Definition repack.c:3504
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:812
void RepackWorkerMain(Datum main_arg)
PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
Definition repack.c:147
void ProcessRepackMessages(void)
Definition repack.c:3515

◆ CLUOPT_VERBOSE

#define CLUOPT_VERBOSE   0x01 /* print progress info */

Definition at line 25 of file repack.h.

Typedef Documentation

◆ ClusterParams

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )
extern

Definition at line 187 of file repack_worker.c.

188{
189 return am_repack_worker;
190}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ check_index_is_clusterable()

void check_index_is_clusterable ( Relation  OldHeap,
Oid  indexOid,
LOCKMODE  lockmode 
)
extern

Definition at line 752 of file repack.c.

753{
755
756 OldIndex = index_open(indexOid, lockmode);
757
758 /*
759 * Check that index is in fact an index on the given relation
760 */
761 if (OldIndex->rd_index == NULL ||
762 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
765 errmsg("\"%s\" is not an index for table \"%s\"",
768
769 /* Index AM must allow clustering */
770 if (!OldIndex->rd_indam->amclusterable)
773 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
775
776 /*
777 * Disallow clustering on incomplete indexes (those that might not index
778 * every row of the relation). We could relax this by making a separate
779 * seqscan pass over the table to copy the missing rows, but that seems
780 * expensive and tedious.
781 */
782 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
785 errmsg("cannot cluster on partial index \"%s\"",
787
788 /*
789 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
790 * it might well not contain entries for every heap row, or might not even
791 * be internally consistent. (But note that we don't check indcheckxmin;
792 * the worst consequence of following broken HOT chains would be that we
793 * might put recently-dead tuples out-of-order in the new table, and there
794 * is little harm in that.)
795 */
796 if (!OldIndex->rd_index->indisvalid)
799 errmsg("cannot cluster on invalid index \"%s\"",
801
802 /* Drop relcache refcnt on OldIndex, but keep lock */
804}
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
#define NoLock
Definition lockdefs.h:34
static char * errmsg
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550

References ereport, errcode(), errmsg, ERROR, fb(), heap_attisnull(), index_close(), index_open(), NoLock, RelationGetRelationName, and RelationGetRelid.

Referenced by ATExecClusterOn(), cluster_rel(), ExecRepack(), and process_single_relation().

◆ cluster_rel()

void cluster_rel ( RepackCommand  cmd,
Relation  OldHeap,
Oid  indexOid,
ClusterParams params,
bool  isTopLevel 
)
extern

Definition at line 503 of file repack.c.

505{
506 Oid tableOid = RelationGetRelid(OldHeap);
509 Oid save_userid;
510 int save_sec_context;
511 int save_nestlevel;
512 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
513 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
514 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
516
517 /* Determine the lock mode to use. */
518 lmode = RepackLockLevel(concurrent);
519
520 /*
521 * Check some preconditions in the concurrent case. This also obtains the
522 * replica index OID.
523 */
524 if (concurrent)
526
527 /* Check for user-requested abort. */
529
532
533 /*
534 * Switch to the table owner's userid, so that any index functions are run
535 * as that user. Also lock down security-restricted operations and
536 * arrange to make GUC variable changes local to this command.
537 */
538 GetUserIdAndSecContext(&save_userid, &save_sec_context);
539 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
540 save_sec_context | SECURITY_RESTRICTED_OPERATION);
541 save_nestlevel = NewGUCNestLevel();
543
544 /*
545 * Recheck that the relation is still what it was when we started.
546 *
547 * Note that it's critical to skip this in single-relation CLUSTER;
548 * otherwise, we would reject an attempt to cluster using a
549 * not-previously-clustered index.
550 */
551 if (recheck &&
552 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
553 lmode, params->options))
554 goto out;
555
556 /*
557 * We allow repacking shared catalogs only when not using an index. It
558 * would work to use an index in most respects, but the index would only
559 * get marked as indisclustered in the current database, leading to
560 * unexpected behavior if CLUSTER were later invoked in another database.
561 */
562 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
565 /*- translator: first %s is name of a SQL command, eg. REPACK */
566 errmsg("cannot execute %s on a shared catalog",
568
569 /*
570 * The CONCURRENTLY case should have been rejected earlier because it does
571 * not support system catalogs.
572 */
573 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
574
575 /*
576 * Don't process temp tables of other backends ... their local buffer
577 * manager is not going to cope.
578 */
582 /*- translator: first %s is name of a SQL command, eg. REPACK */
583 errmsg("cannot execute %s on temporary tables of other sessions",
585
586 /*
587 * Also check for active uses of the relation in the current transaction,
588 * including open scans and pending AFTER trigger events.
589 */
591
592 /* Check heap and index are valid to cluster on */
593 if (OidIsValid(indexOid))
594 {
595 /* verify the index is good and lock it */
597 /* also open it */
598 index = index_open(indexOid, NoLock);
599 }
600 else
601 index = NULL;
602
603 /*
604 * When allow_system_table_mods is turned off, we disallow repacking a
605 * catalog on a particular index unless that's already the clustered index
606 * for that catalog.
607 *
608 * XXX We don't check for this in CLUSTER, because it's historically been
609 * allowed.
610 */
611 if (cmd != REPACK_COMMAND_CLUSTER &&
612 !allowSystemTableMods && OidIsValid(indexOid) &&
613 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
616 errmsg("permission denied: \"%s\" is a system catalog",
618 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
619 "allow_system_table_mods"));
620
621 /*
622 * Quietly ignore the request if this is a materialized view which has not
623 * been populated from its query. No harm is done because there is no data
624 * to deal with, and we don't want to throw an error if this is part of a
625 * multi-relation request -- for example, CLUSTER was run on the entire
626 * database.
627 */
628 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
630 {
631 if (index)
634 goto out;
635 }
636
637 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
638 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
639 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
640
641 /*
642 * All predicate locks on the tuples or pages are about to be made
643 * invalid, because we move tuples around. Promote them to relation
644 * locks. Predicate locks on indexes will be promoted when they are
645 * reindexed.
646 *
647 * During concurrent processing, the heap as well as its indexes stay in
648 * operation, so we postpone this step until they are locked using
649 * AccessExclusiveLock near the end of the processing.
650 */
651 if (!concurrent)
653
654 /* rebuild_relation does all the dirty work */
655 PG_TRY();
656 {
658 }
659 PG_FINALLY();
660 {
661 if (concurrent)
662 {
663 /*
664 * Since during normal operation the worker was already asked to
665 * exit, stopping it explicitly is especially important on ERROR.
666 * However it still seems a good practice to make sure that the
667 * worker never survives the REPACK command.
668 */
670 }
671 }
672 PG_END_TRY();
673
674 /* rebuild_relation closes OldHeap, and index if valid */
675
676out:
677 /* Roll back any GUC changes executed by index functions */
678 AtEOXact_GUC(false, save_nestlevel);
679
680 /* Restore userid and security context */
681 SetUserIdAndSecContext(save_userid, save_sec_context);
682
684}
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_FINALLY(...)
Definition elog.h:391
bool allowSystemTableMods
Definition globals.c:132
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:331
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
@ REPACK_COMMAND_CLUSTER
static int verbose
#define InvalidOid
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3052
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define RelationIsPopulated(relation)
Definition rel.h:688
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:669
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:691
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:878
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:752
static void stop_repack_decoding_worker(void)
Definition repack.c:3391
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:470
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2440
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:960
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK
Definition repack.h:26
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
uint32 options
Definition repack.h:34
Definition type.h:96
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4473

References allowSystemTableMods, Assert, AtEOXact_GUC(), check_concurrent_repack_requirements(), CHECK_FOR_INTERRUPTS, check_index_is_clusterable(), CheckTableNotInUse(), CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_VERBOSE, cluster_rel_recheck(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserIdAndSecContext(), index_close(), index_open(), InvalidOid, IsCatalogRelation(), NewGUCNestLevel(), NoLock, OidIsValid, ClusterParams::options, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_progress_end_command(), pgstat_progress_start_command(), pgstat_progress_update_param(), PROGRESS_COMMAND_REPACK, PROGRESS_REPACK_COMMAND, rebuild_relation(), relation_close(), RELATION_IS_OTHER_TEMP, RelationGetRelationName, RelationGetRelid, RelationIsPopulated, REPACK_COMMAND_CLUSTER, RepackCommandAsString(), RepackLockLevel(), RestrictSearchPath(), SECURITY_RESTRICTED_OPERATION, SetUserIdAndSecContext(), stop_repack_decoding_worker(), TransferPredicateLocksToHeapRelation(), and verbose.

Referenced by ExecRepack(), process_single_relation(), and vacuum_rel().

◆ ExecRepack()

void ExecRepack ( ParseState pstate,
RepackStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 238 of file repack.c.

239{
240 ClusterParams params = {0};
241 Relation rel = NULL;
243 LOCKMODE lockmode;
244 List *rtcs;
245
246 /* Parse option list */
247 foreach_node(DefElem, opt, stmt->params)
248 {
249 if (strcmp(opt->defname, "verbose") == 0)
250 params.options |= defGetBoolean(opt) ? CLUOPT_VERBOSE : 0;
251 else if (strcmp(opt->defname, "analyze") == 0 ||
252 strcmp(opt->defname, "analyse") == 0)
253 params.options |= defGetBoolean(opt) ? CLUOPT_ANALYZE : 0;
254 else if (strcmp(opt->defname, "concurrently") == 0 &&
255 defGetBoolean(opt))
256 {
257 if (stmt->command != REPACK_COMMAND_REPACK)
260 errmsg("CONCURRENTLY option not supported for %s",
261 RepackCommandAsString(stmt->command)));
262 params.options |= CLUOPT_CONCURRENT;
263 }
264 else
267 errmsg("unrecognized %s option \"%s\"",
268 RepackCommandAsString(stmt->command),
269 opt->defname),
270 parser_errposition(pstate, opt->location));
271 }
272
273 /* Determine the lock mode to use. */
274 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
275
276 if ((params.options & CLUOPT_CONCURRENT) != 0)
277 {
278 /*
279 * Make sure we're not in a transaction block.
280 *
281 * The reason is that repack_setup_logical_decoding() could wait
282 * indefinitely for our XID to complete. (The deadlock detector would
283 * not recognize it because we'd be waiting for ourselves, i.e. no
284 * real lock conflict.) It would be possible to run in a transaction
285 * block if we had no XID, but this restriction is simpler for users
286 * to understand and we don't lose any functionality.
287 */
288 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
289 }
290
291 /*
292 * If a single relation is specified, process it and we're done ... unless
293 * the relation is a partitioned table, in which case we fall through.
294 */
295 if (stmt->relation != NULL)
296 {
297 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
298 if (rel == NULL)
299 return; /* all done */
300 }
301
302 /*
303 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
304 * can add support for this later.
305 */
306 if (params.options & CLUOPT_ANALYZE)
309 errmsg("cannot execute %s on multiple tables",
310 "REPACK (ANALYZE)"));
311
312 /*
313 * By here, we know we are in a multi-table situation.
314 *
315 * Concurrent processing is currently considered rather special (e.g. in
316 * terms of resources consumed) so it is not performed in bulk.
317 */
318 if (params.options & CLUOPT_CONCURRENT)
319 {
320 if (rel != NULL)
321 {
322 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
325 errmsg("REPACK (CONCURRENTLY) is not supported for partitioned tables"),
326 errhint("Consider running the command on individual partitions."));
327 }
328 else
331 errmsg("REPACK (CONCURRENTLY) requires an explicit table name"));
332 }
333
334 /*
335 * In order to avoid holding locks for too long, we want to process each
336 * table in its own transaction. This forces us to disallow running
337 * inside a user transaction block.
338 */
340
341 /* Also, we need a memory context to hold our list of relations */
343 "Repack",
345
346 /*
347 * Since we open a new transaction for each relation, we have to check
348 * that the relation still is what we think it is.
349 *
350 * In single-transaction CLUSTER, we don't need the overhead.
351 */
352 params.options |= CLUOPT_RECHECK;
353
354 /*
355 * If we don't have a relation yet, determine a relation list. If we do,
356 * then it must be a partitioned table, and we want to process its
357 * partitions.
358 */
359 if (rel == NULL)
360 {
361 Assert(stmt->indexname == NULL);
362 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
365 }
366 else
367 {
368 Oid relid;
369 bool rel_is_index;
370
371 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
372
373 /*
374 * If USING INDEX was specified, resolve the index name now and pass
375 * it down.
376 */
377 if (stmt->usingindex)
378 {
379 /*
380 * If no index name was specified when repacking a partitioned
381 * table, punt for now. Maybe we can improve this later.
382 */
383 if (!stmt->indexname)
384 {
385 if (stmt->command == REPACK_COMMAND_CLUSTER)
388 errmsg("there is no previously clustered index for table \"%s\"",
390 else
393 /*- translator: first %s is name of a SQL command, eg. REPACK */
394 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
395 RepackCommandAsString(stmt->command),
397 }
398
399 relid = determine_clustered_index(rel, stmt->usingindex,
400 stmt->indexname);
401 if (!OidIsValid(relid))
402 elog(ERROR, "unable to determine index to cluster on");
404
405 rel_is_index = true;
406 }
407 else
408 {
409 relid = RelationGetRelid(rel);
410 rel_is_index = false;
411 }
412
414 relid, rel_is_index,
416
417 /* close parent relation, releasing lock on it */
419 rel = NULL;
420 }
421
422 /* Commit to get out of starting transaction */
425
426 /* Cluster the tables, each in a separate transaction */
427 Assert(rel == NULL);
429 {
430 /* Start a new transaction for each relation. */
432
433 /*
434 * Open the target table, coping with the case where it has been
435 * dropped.
436 */
437 rel = try_table_open(rtc->tableOid, lockmode);
438 if (rel == NULL)
439 {
441 continue;
442 }
443
444 /* functions in indexes may want a snapshot set */
446
447 /* Process this table */
448 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
449 /* cluster_rel closes the relation, but keeps lock */
450
453 }
454
455 /* Start a new transaction for the cleanup work. */
457
458 /* Clean up working storage */
460}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errhint(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:228
#define AccessExclusiveLock
Definition lockdefs.h:43
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
MemoryContext PortalContext
Definition mcxt.c:175
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
@ REPACK_COMMAND_REPACK
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2215
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2306
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:503
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2085
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2397
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207

References AccessExclusiveLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, check_index_is_clusterable(), CLUOPT_ANALYZE, CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_RECHECK_ISCLUSTERED, CLUOPT_VERBOSE, cluster_rel(), CommitTransactionCommand(), defGetBoolean(), determine_clustered_index(), elog, ereport, errcode(), errhint(), errmsg, ERROR, fb(), foreach_node, foreach_ptr, get_tables_to_repack(), get_tables_to_repack_partitioned(), GetTransactionSnapshot(), MemoryContextDelete(), OidIsValid, ClusterParams::options, parser_errposition(), PopActiveSnapshot(), PortalContext, PreventInTransactionBlock(), process_single_relation(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, REPACK_COMMAND_CLUSTER, REPACK_COMMAND_REPACK, RepackCommandAsString(), RepackLockLevel(), StartTransactionCommand(), stmt, table_close(), and try_table_open().

Referenced by standard_ProcessUtility().

◆ finish_heap_swap()

void finish_heap_swap ( Oid  OIDOldHeap,
Oid  OIDNewHeap,
bool  is_system_catalog,
bool  swap_toast_by_content,
bool  check_constraints,
bool  is_internal,
bool  reindex,
TransactionId  frozenXid,
MultiXactId  cutoffMulti,
char  newrelpersistence 
)
extern

Definition at line 1864 of file repack.c.

1873{
1874 ObjectAddress object;
1875 Oid mapped_tables[4];
1876 int i;
1877
1878 /* Report that we are now swapping relation files */
1881
1882 /* Zero out possible results from swapped_relation_files */
1883 memset(mapped_tables, 0, sizeof(mapped_tables));
1884
1885 /*
1886 * Swap the contents of the heap relations (including any toast tables).
1887 * Also set old heap's relfrozenxid to frozenXid.
1888 */
1891 swap_toast_by_content, is_internal,
1893
1894 /*
1895 * If it's a system catalog, queue a sinval message to flush all catcaches
1896 * on the catalog when we reach CommandCounterIncrement.
1897 */
1900
1901 if (reindex)
1902 {
1903 int reindex_flags;
1905
1906 /*
1907 * Rebuild each index on the relation (but not the toast table, which
1908 * is all-new at this point). It is important to do this before the
1909 * DROP step because if we are processing a system catalog that will
1910 * be used during DROP, we want to have its indexes available. There
1911 * is no advantage to the other order anyway because this is all
1912 * transactional, so no chance to reclaim disk space before commit. We
1913 * do not need a final CommandCounterIncrement() because
1914 * reindex_relation does it.
1915 *
1916 * Note: because index_build is called via reindex_relation, it will
1917 * never set indcheckxmin true for the indexes. This is OK even
1918 * though in some sense we are building new indexes rather than
1919 * rebuilding existing ones, because the new heap won't contain any
1920 * HOT chains at all, let alone broken ones, so it can't be necessary
1921 * to set indcheckxmin.
1922 */
1926
1927 /*
1928 * Ensure that the indexes have the same persistence as the parent
1929 * relation.
1930 */
1931 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1933 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1935
1936 /* Report that we are now reindexing relations */
1939
1941 }
1942
1943 /* Report that we are now doing clean up */
1946
1947 /*
1948 * If the relation being rebuilt is pg_class, swap_relation_files()
1949 * couldn't update pg_class's own pg_class entry (check comments in
1950 * swap_relation_files()), thus relfrozenxid was not updated. That's
1951 * annoying because a potential reason for doing a VACUUM FULL is a
1952 * imminent or actual anti-wraparound shutdown. So, now that we can
1953 * access the new relation using its indices, update relfrozenxid.
1954 * pg_class doesn't have a toast relation, so we don't need to update the
1955 * corresponding toast relation. Not that there's little point moving all
1956 * relfrozenxid updates here since swap_relation_files() needs to write to
1957 * pg_class for non-mapped relations anyway.
1958 */
1960 {
1964
1966
1969 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1971
1972 relform->relfrozenxid = frozenXid;
1973 relform->relminmxid = cutoffMulti;
1974
1976
1978 }
1979
1980 /* Destroy new heap with old filenumber */
1981 object.classId = RelationRelationId;
1982 object.objectId = OIDNewHeap;
1983 object.objectSubId = 0;
1984
1985 if (!reindex)
1986 {
1987 /*
1988 * Make sure the changes in pg_class are visible. This is especially
1989 * important if !swap_toast_by_content, so that the correct TOAST
1990 * relation is dropped. (reindex_relation() above did not help in this
1991 * case))
1992 */
1994 }
1995
1996 /*
1997 * The new relation is local to our transaction and we know nothing
1998 * depends on it, so DROP_RESTRICT should be OK.
1999 */
2001
2002 /* performDeletion does CommandCounterIncrement at end */
2003
2004 /*
2005 * Now we must remove any relation mapping entries that we set up for the
2006 * transient table, as well as its toast table and toast index if any. If
2007 * we fail to do this before commit, the relmapper will complain about new
2008 * permanent map entries being added post-bootstrap.
2009 */
2010 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2012
2013 /*
2014 * At this point, everything is kosher except that, if we did toast swap
2015 * by links, the toast table's name corresponds to the transient table.
2016 * The name is irrelevant to the backend because it's referenced by OID,
2017 * but users looking at the catalogs could be confused. Rename it to
2018 * prevent this problem.
2019 *
2020 * Note no lock required on the relation, because we already hold an
2021 * exclusive lock on it.
2022 */
2024 {
2026
2028 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2029 {
2030 Oid toastidx;
2032
2033 /* Get the associated valid index to be renamed */
2034 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2036
2037 /* rename the toast table ... */
2038 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2039 OIDOldHeap);
2040 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2041 NewToastName, true, false);
2042
2043 /* ... and its valid index too. */
2044 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2045 OIDOldHeap);
2046
2048 NewToastName, true, true);
2049
2050 /*
2051 * Reset the relrewrite for the toast. The command-counter
2052 * increment is required here as we are about to update the tuple
2053 * that is updated as part of RenameRelationInternal.
2054 */
2056 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2057 }
2059 }
2060
2061 /* if it's not a catalog table, clear any missing attribute settings */
2062 if (!is_system_catalog)
2063 {
2065
2069 }
2070}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
void RelationClearMissing(Relation rel)
Definition heap.c:1978
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1612
int i
Definition isn.c:77
#define RowExclusiveLock
Definition lockdefs.h:38
@ DROP_RESTRICT
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define snprintf
Definition port.h:260
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1482
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4420
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4327
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessExclusiveLock, CacheInvalidateCatalog(), CatalogTupleUpdate(), CommandCounterIncrement(), DROP_RESTRICT, elog, ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, i, NAMEDATALEN, NoLock, ObjectIdGetDatum(), OidIsValid, PERFORM_DELETION_INTERNAL, performDeletion(), pgstat_progress_update_param(), PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP, PROGRESS_REPACK_PHASE_REBUILD_INDEX, PROGRESS_REPACK_PHASE_SWAP_REL_FILES, REINDEX_REL_CHECK_CONSTRAINTS, REINDEX_REL_FORCE_INDEXES_PERMANENT, REINDEX_REL_FORCE_INDEXES_UNLOGGED, REINDEX_REL_SUPPRESS_INDEX_USE, reindex_relation(), relation_close(), RelationClearMissing(), RelationMapRemoveMapping(), RenameRelationInternal(), ResetRelRewrite(), RowExclusiveLock, SearchSysCacheCopy1, snprintf, swap_relation_files(), table_close(), table_open(), and toast_get_valid_index().

Referenced by ATRewriteTables(), rebuild_relation(), rebuild_relation_finish_concurrent(), and refresh_by_heap_swap().

◆ HandleRepackMessageInterrupt()

void HandleRepackMessageInterrupt ( void  )
extern

Definition at line 3504 of file repack.c.

3505{
3506 InterruptPending = true;
3507 RepackMessagePending = true;
3509}
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
volatile sig_atomic_t RepackMessagePending
Definition repack.c:147

References InterruptPending, MyLatch, RepackMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ make_new_heap()

Oid make_new_heap ( Oid  OIDOldHeap,
Oid  NewTableSpace,
Oid  NewAccessMethod,
char  relpersistence,
LOCKMODE  lockmode 
)
extern

Definition at line 1107 of file repack.c.

1109{
1113 Oid toastid;
1115 HeapTuple tuple;
1116 Datum reloptions;
1117 bool isNull;
1119
1120 OldHeap = table_open(OIDOldHeap, lockmode);
1122
1123 /*
1124 * Note that the NewHeap will not receive any of the defaults or
1125 * constraints associated with the OldHeap; we don't need 'em, and there's
1126 * no reason to spend cycles inserting them into the catalogs only to
1127 * delete them.
1128 */
1129
1130 /*
1131 * But we do want to use reloptions of the old heap for new heap.
1132 */
1134 if (!HeapTupleIsValid(tuple))
1135 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1136 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1137 &isNull);
1138 if (isNull)
1139 reloptions = (Datum) 0;
1140
1141 if (relpersistence == RELPERSISTENCE_TEMP)
1143 else
1145
1146 /*
1147 * Create the new heap, using a temporary name in the same namespace as
1148 * the existing table. NOTE: there is some risk of collision with user
1149 * relnames. Working around this seems more trouble than it's worth; in
1150 * particular, we can't create the new heap in a different namespace from
1151 * the old, or we will have problems with the TEMP status of temp tables.
1152 *
1153 * Note: the new heap is not a shared relation, even if we are rebuilding
1154 * a shared rel. However, we do make the new heap mapped if the source is
1155 * mapped. This simplifies swap_relation_files, and is absolutely
1156 * necessary for rebuilding pg_class, for reasons explained there.
1157 */
1158 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1159
1163 InvalidOid,
1164 InvalidOid,
1165 InvalidOid,
1166 OldHeap->rd_rel->relowner,
1169 NIL,
1171 relpersistence,
1172 false,
1175 reloptions,
1176 false,
1177 true,
1178 true,
1179 OIDOldHeap,
1180 NULL);
1182
1183 ReleaseSysCache(tuple);
1184
1185 /*
1186 * Advance command counter so that the newly-created relation's catalog
1187 * tuples will be visible to table_open.
1188 */
1190
1191 /*
1192 * If necessary, create a TOAST table for the new relation.
1193 *
1194 * If the relation doesn't have a TOAST table already, we can't need one
1195 * for the new relation. The other way around is possible though: if some
1196 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1197 * no TOAST table is needed for the new table.
1198 *
1199 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1200 * that the TOAST table will be visible for insertion.
1201 */
1202 toastid = OldHeap->rd_rel->reltoastrelid;
1203 if (OidIsValid(toastid))
1204 {
1205 /* keep the existing toast table's reloptions, if any */
1207 if (!HeapTupleIsValid(tuple))
1208 elog(ERROR, "cache lookup failed for relation %u", toastid);
1209 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1210 &isNull);
1211 if (isNull)
1212 reloptions = (Datum) 0;
1213
1214 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1215
1216 ReleaseSysCache(tuple);
1217 }
1218
1220
1221 return OIDNewHeap;
1222}
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1136
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
#define NIL
Definition pg_list.h:68
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65

References Assert, CommandCounterIncrement(), elog, ERROR, fb(), heap_create_with_catalog(), HeapTupleIsValid, InvalidOid, LookupCreationNamespace(), NAMEDATALEN, NewHeapCreateToastTable(), NIL, NoLock, ObjectIdGetDatum(), OidIsValid, ONCOMMIT_NOOP, RelationGetDescr, RelationGetNamespace, RelationIsMapped, ReleaseSysCache(), SearchSysCache1(), snprintf, SysCacheGetAttr(), table_close(), and table_open().

Referenced by ATRewriteTables(), rebuild_relation(), and RefreshMatViewByOid().

◆ mark_index_clustered()

void mark_index_clustered ( Relation  rel,
Oid  indexOid,
bool  is_internal 
)
extern

Definition at line 812 of file repack.c.

813{
818
819 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
820
821 /*
822 * If the index is already marked clustered, no need to do anything.
823 */
824 if (OidIsValid(indexOid))
825 {
826 if (get_index_isclustered(indexOid))
827 return;
828 }
829
830 /*
831 * Check each index of the relation and set/clear the bit as needed.
832 */
834
835 foreach(index, RelationGetIndexList(rel))
836 {
838
842 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
844
845 /*
846 * Unset the bit if set. We know it's wrong because we checked this
847 * earlier.
848 */
849 if (indexForm->indisclustered)
850 {
851 indexForm->indisclustered = false;
853 }
854 else if (thisIndexOid == indexOid)
855 {
856 /* this was checked earlier, but let's be real sure */
857 if (!indexForm->indisvalid)
858 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
859 indexForm->indisclustered = true;
861 }
862
864 InvalidOid, is_internal);
865
867 }
868
870}
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3821
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
#define lfirst_oid(lc)
Definition pg_list.h:174
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4827

References Assert, CatalogTupleUpdate(), elog, ERROR, fb(), Form_pg_index, get_index_isclustered(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, InvalidOid, InvokeObjectPostAlterHookArg, lfirst_oid, ObjectIdGetDatum(), OidIsValid, RelationData::rd_rel, RelationGetIndexList(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ATExecClusterOn(), ATExecDropCluster(), and rebuild_relation().

◆ ProcessRepackMessages()

void ProcessRepackMessages ( void  )
extern

Definition at line 3515 of file repack.c.

3516{
3517 MemoryContext oldcontext;
3519
3520 /*
3521 * Nothing to do if we haven't launched the worker yet or have already
3522 * terminated it.
3523 */
3524 if (decoding_worker == NULL)
3525 return;
3526
3527 /*
3528 * This is invoked from ProcessInterrupts(), and since some of the
3529 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3530 * for recursive calls if more signals are received while this runs. It's
3531 * unclear that recursive entry would be safe, and it doesn't seem useful
3532 * even if it is safe, so let's block interrupts until done.
3533 */
3535
3536 /*
3537 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3538 * don't want to risk leaking data into long-lived contexts, so let's do
3539 * our work here in a private context that we can reset on each use.
3540 */
3541 if (hpm_context == NULL) /* first time through? */
3543 "ProcessRepackMessages",
3545 else
3547
3548 oldcontext = MemoryContextSwitchTo(hpm_context);
3549
3550 /* OK to process messages. Reset the flag saying there are more to do. */
3551 RepackMessagePending = false;
3552
3553 /*
3554 * Read as many messages as we can from the worker, but stop when no more
3555 * messages can be read from the worker without blocking.
3556 */
3557 while (true)
3558 {
3559 shm_mq_result res;
3560 Size nbytes;
3561 void *data;
3562
3564 &data, true);
3565 if (res == SHM_MQ_WOULD_BLOCK)
3566 break;
3567 else if (res == SHM_MQ_SUCCESS)
3568 {
3569 StringInfoData msg;
3570
3571 initStringInfo(&msg);
3572 appendBinaryStringInfo(&msg, data, nbytes);
3574 pfree(msg.data);
3575 }
3576 else
3577 {
3578 /*
3579 * The decoding worker is special in that it exits as soon as it
3580 * has its work done. Thus the DETACHED result code is fine.
3581 */
3582 Assert(res == SHM_MQ_DETACHED);
3583
3584 break;
3585 }
3586 }
3587
3588 MemoryContextSwitchTo(oldcontext);
3589
3590 /* Might as well clear the context on our way out */
3592
3594}
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const void * data
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3600
static DecodingWorker * decoding_worker
Definition repack.c:141
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mqh
Definition repack.c:137

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), Assert, StringInfoData::data, data, decoding_worker, DecodingWorker::error_mqh, fb(), HOLD_INTERRUPTS, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), pfree(), ProcessRepackMessage(), RepackMessagePending, RESUME_INTERRUPTS, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)
extern

Definition at line 60 of file repack_worker.c.

61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
72 /*
73 * Override the default bgworker_die() with die() so we can use
74 * CHECK_FOR_INTERRUPTS().
75 */
78
80 if (seg == NULL)
83 errmsg("could not map dynamic shared memory segment"));
85
87
88 /* Arrange to signal the leader if we exit. */
90
91 /*
92 * Join locking group - see the comments around the call of
93 * start_repack_decoding_worker().
94 */
95 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 return; /* The leader is not running anymore. */
97
98 /*
99 * Setup a queue to send error messages to the backend that launched this
100 * worker.
101 */
102 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
104 mqh = shm_mq_attach(mq, seg, NULL);
107 shared->backend_proc_number);
108
109 /* Connect to the database. LOGIN is not required. */
112
113 /*
114 * Transaction is needed to open relation, and it also provides us with a
115 * resource owner.
116 */
118
120
121 /*
122 * Not sure the spinlock is needed here - the backend should not change
123 * anything in the shared memory until we have serialized the snapshot.
124 */
125 SpinLockAcquire(&shared->mutex);
127 sfs = &shared->sfs;
128 SpinLockRelease(&shared->mutex);
129
130 SharedFileSetAttach(sfs, seg);
131
132 /*
133 * Prepare to capture the concurrent data changes ourselves.
134 */
136
137 /* Announce that we're ready. */
138 SpinLockAcquire(&shared->mutex);
139 shared->initialized = true;
140 SpinLockRelease(&shared->mutex);
141 ConditionVariableSignal(&shared->cv);
142
143 /* There doesn't seem to a nice API to set these */
145 XactReadOnly = true;
146
147 /* Build the initial snapshot and export it. */
148 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 export_initial_snapshot(snapshot, shared);
150
151 /*
152 * Only historic snapshots should be used now. Do not let us restrict the
153 * progress of xmin horizon.
154 */
156
157 for (;;)
158 {
159 bool stop = decode_concurrent_changes(decoding_ctx, shared);
160
161 if (stop)
162 break;
163
164 }
165
166 /* Cleanup. */
169}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BUFFERALIGN(LEN)
Definition c.h:898
void ConditionVariableSignal(ConditionVariable *cv)
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
#define die(msg)
#define pqsignal
Definition port.h:547
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
#define PointerGetDatum(X)
Definition postgres.h:354
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static dsm_segment * worker_dsm_segment
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2072
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BGWORKER_BYPASS_ROLELOGINCHECK, BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), worker_dsm_segment, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

Variable Documentation

◆ RepackMessagePending

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
extern