PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.h File Reference
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
Include dependency graph for repack.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ClusterParams
 

Macros

#define CLUOPT_VERBOSE   0x01 /* print progress info */
 
#define CLUOPT_RECHECK   0x02 /* recheck relation state */
 
#define CLUOPT_RECHECK_ISCLUSTERED
 
#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */
 
#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */
 

Typedefs

typedef struct ClusterParams ClusterParams
 

Functions

void ExecRepack (ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 
void cluster_rel (RepackCommand command, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
 
void check_index_is_clusterable (Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
 
void mark_index_clustered (Relation rel, Oid indexOid, bool is_internal)
 
Oid make_new_heap (Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
 
void finish_heap_swap (Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
 
void HandleRepackMessageInterrupt (void)
 
void ProcessRepackMessages (void)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 

Variables

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
 

Macro Definition Documentation

◆ CLUOPT_ANALYZE

#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */

Definition at line 28 of file repack.h.

◆ CLUOPT_CONCURRENT

#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */

Definition at line 29 of file repack.h.

◆ CLUOPT_RECHECK

#define CLUOPT_RECHECK   0x02 /* recheck relation state */

Definition at line 26 of file repack.h.

◆ CLUOPT_RECHECK_ISCLUSTERED

#define CLUOPT_RECHECK_ISCLUSTERED
Value:
0x04 /* recheck relation state for
* indisclustered */

Definition at line 27 of file repack.h.

34{
35 uint32 options; /* bitmask of CLUOPT_* */
37
39
40
41extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
42
43extern void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid,
44 ClusterParams *params, bool isTopLevel);
46 LOCKMODE lockmode);
47extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
48
50 char relpersistence, LOCKMODE lockmode);
55 bool is_internal,
56 bool reindex,
59 char newrelpersistence);
60
61extern void HandleRepackMessageInterrupt(void);
62extern void ProcessRepackMessages(void);
63
64/* in repack_worker.c */
66extern bool AmRepackWorker(void);
67
68#endif /* REPACK_H */
#define PGDLLIMPORT
Definition c.h:1421
TransactionId MultiXactId
Definition c.h:746
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define stmt
int LOCKMODE
Definition lockdefs.h:26
RepackCommand
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
bool AmRepackWorker(void)
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1865
void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:502
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:751
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:237
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1108
void HandleRepackMessageInterrupt(void)
Definition repack.c:3508
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:811
void RepackWorkerMain(Datum main_arg)
PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
Definition repack.c:146
void ProcessRepackMessages(void)
Definition repack.c:3519

◆ CLUOPT_VERBOSE

#define CLUOPT_VERBOSE   0x01 /* print progress info */

Definition at line 25 of file repack.h.

Typedef Documentation

◆ ClusterParams

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )
extern

Definition at line 183 of file repack_worker.c.

184{
185 return am_repack_worker;
186}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ check_index_is_clusterable()

void check_index_is_clusterable ( Relation  OldHeap,
Oid  indexOid,
LOCKMODE  lockmode 
)
extern

Definition at line 751 of file repack.c.

752{
754
755 OldIndex = index_open(indexOid, lockmode);
756
757 /*
758 * Check that index is in fact an index on the given relation
759 */
760 if (OldIndex->rd_index == NULL ||
761 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
764 errmsg("\"%s\" is not an index for table \"%s\"",
767
768 /* Index AM must allow clustering */
769 if (!OldIndex->rd_indam->amclusterable)
772 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
774
775 /*
776 * Disallow clustering on incomplete indexes (those that might not index
777 * every row of the relation). We could relax this by making a separate
778 * seqscan pass over the table to copy the missing rows, but that seems
779 * expensive and tedious.
780 */
781 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
784 errmsg("cannot cluster on partial index \"%s\"",
786
787 /*
788 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
789 * it might well not contain entries for every heap row, or might not even
790 * be internally consistent. (But note that we don't check indcheckxmin;
791 * the worst consequence of following broken HOT chains would be that we
792 * might put recently-dead tuples out-of-order in the new table, and there
793 * is little harm in that.)
794 */
795 if (!OldIndex->rd_index->indisvalid)
798 errmsg("cannot cluster on invalid index \"%s\"",
800
801 /* Drop relcache refcnt on OldIndex, but keep lock */
803}
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:151
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
#define NoLock
Definition lockdefs.h:34
static char * errmsg
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550

References ereport, errcode(), errmsg, ERROR, fb(), heap_attisnull(), index_close(), index_open(), NoLock, RelationGetRelationName, and RelationGetRelid.

Referenced by ATExecClusterOn(), cluster_rel(), ExecRepack(), and process_single_relation().

◆ cluster_rel()

void cluster_rel ( RepackCommand  command,
Relation  OldHeap,
Oid  indexOid,
ClusterParams params,
bool  isTopLevel 
)
extern

Definition at line 502 of file repack.c.

504{
505 Oid tableOid = RelationGetRelid(OldHeap);
508 Oid save_userid;
509 int save_sec_context;
510 int save_nestlevel;
511 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
512 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
513 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
515
516 /* Determine the lock mode to use. */
517 lmode = RepackLockLevel(concurrent);
518
519 /*
520 * Check some preconditions in the concurrent case. This also obtains the
521 * replica index OID.
522 */
523 if (concurrent)
525
526 /* Check for user-requested abort. */
528
531
532 /*
533 * Switch to the table owner's userid, so that any index functions are run
534 * as that user. Also lock down security-restricted operations and
535 * arrange to make GUC variable changes local to this command.
536 */
537 GetUserIdAndSecContext(&save_userid, &save_sec_context);
538 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
539 save_sec_context | SECURITY_RESTRICTED_OPERATION);
540 save_nestlevel = NewGUCNestLevel();
542
543 /*
544 * Recheck that the relation is still what it was when we started.
545 *
546 * Note that it's critical to skip this in single-relation CLUSTER;
547 * otherwise, we would reject an attempt to cluster using a
548 * not-previously-clustered index.
549 */
550 if (recheck &&
551 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
552 lmode, params->options))
553 goto out;
554
555 /*
556 * We allow repacking shared catalogs only when not using an index. It
557 * would work to use an index in most respects, but the index would only
558 * get marked as indisclustered in the current database, leading to
559 * unexpected behavior if CLUSTER were later invoked in another database.
560 */
561 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
564 /*- translator: first %s is name of a SQL command, eg. REPACK */
565 errmsg("cannot execute %s on a shared catalog",
567
568 /*
569 * The CONCURRENTLY case should have been rejected earlier because it does
570 * not support system catalogs.
571 */
572 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
573
574 /*
575 * Don't process temp tables of other backends ... their local buffer
576 * manager is not going to cope.
577 */
581 /*- translator: first %s is name of a SQL command, eg. REPACK */
582 errmsg("cannot execute %s on temporary tables of other sessions",
584
585 /*
586 * Also check for active uses of the relation in the current transaction,
587 * including open scans and pending AFTER trigger events.
588 */
590
591 /* Check heap and index are valid to cluster on */
592 if (OidIsValid(indexOid))
593 {
594 /* verify the index is good and lock it */
596 /* also open it */
597 index = index_open(indexOid, NoLock);
598 }
599 else
600 index = NULL;
601
602 /*
603 * When allow_system_table_mods is turned off, we disallow repacking a
604 * catalog on a particular index unless that's already the clustered index
605 * for that catalog.
606 *
607 * XXX We don't check for this in CLUSTER, because it's historically been
608 * allowed.
609 */
610 if (cmd != REPACK_COMMAND_CLUSTER &&
611 !allowSystemTableMods && OidIsValid(indexOid) &&
612 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
615 errmsg("permission denied: \"%s\" is a system catalog",
617 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
618 "allow_system_table_mods"));
619
620 /*
621 * Quietly ignore the request if this is a materialized view which has not
622 * been populated from its query. No harm is done because there is no data
623 * to deal with, and we don't want to throw an error if this is part of a
624 * multi-relation request -- for example, CLUSTER was run on the entire
625 * database.
626 */
627 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
629 {
630 if (index)
633 goto out;
634 }
635
636 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
637 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
638 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
639
640 /*
641 * All predicate locks on the tuples or pages are about to be made
642 * invalid, because we move tuples around. Promote them to relation
643 * locks. Predicate locks on indexes will be promoted when they are
644 * reindexed.
645 *
646 * During concurrent processing, the heap as well as its indexes stay in
647 * operation, so we postpone this step until they are locked using
648 * AccessExclusiveLock near the end of the processing.
649 */
650 if (!concurrent)
652
653 /* rebuild_relation does all the dirty work */
654 PG_TRY();
655 {
657 }
658 PG_FINALLY();
659 {
660 if (concurrent)
661 {
662 /*
663 * Since during normal operation the worker was already asked to
664 * exit, stopping it explicitly is especially important on ERROR.
665 * However it still seems a good practice to make sure that the
666 * worker never survives the REPACK command.
667 */
669 }
670 }
671 PG_END_TRY();
672
673 /* rebuild_relation closes OldHeap, and index if valid */
674
675out:
676 /* Roll back any GUC changes executed by index functions */
677 AtEOXact_GUC(false, save_nestlevel);
678
679 /* Restore userid and security context */
680 SetUserIdAndSecContext(save_userid, save_sec_context);
681
683}
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:373
#define PG_END_TRY(...)
Definition elog.h:398
#define PG_FINALLY(...)
Definition elog.h:390
bool allowSystemTableMods
Definition globals.c:130
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:320
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
@ REPACK_COMMAND_CLUSTER
static int verbose
#define InvalidOid
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3053
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define RelationIsPopulated(relation)
Definition rel.h:688
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:669
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:690
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:877
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:751
static void stop_repack_decoding_worker(void)
Definition repack.c:3395
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:469
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2441
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:961
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK
Definition repack.h:26
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
uint32 options
Definition repack.h:34
Definition type.h:96
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4447

References allowSystemTableMods, Assert, AtEOXact_GUC(), check_concurrent_repack_requirements(), CHECK_FOR_INTERRUPTS, check_index_is_clusterable(), CheckTableNotInUse(), CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_VERBOSE, cluster_rel_recheck(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserIdAndSecContext(), index_close(), index_open(), InvalidOid, IsCatalogRelation(), NewGUCNestLevel(), NoLock, OidIsValid, ClusterParams::options, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_progress_end_command(), pgstat_progress_start_command(), pgstat_progress_update_param(), PROGRESS_COMMAND_REPACK, PROGRESS_REPACK_COMMAND, rebuild_relation(), relation_close(), RELATION_IS_OTHER_TEMP, RelationGetRelationName, RelationGetRelid, RelationIsPopulated, REPACK_COMMAND_CLUSTER, RepackCommandAsString(), RepackLockLevel(), RestrictSearchPath(), SECURITY_RESTRICTED_OPERATION, SetUserIdAndSecContext(), stop_repack_decoding_worker(), TransferPredicateLocksToHeapRelation(), and verbose.

Referenced by ExecRepack(), process_single_relation(), and vacuum_rel().

◆ ExecRepack()

void ExecRepack ( ParseState pstate,
RepackStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 237 of file repack.c.

238{
239 ClusterParams params = {0};
240 Relation rel = NULL;
242 LOCKMODE lockmode;
243 List *rtcs;
244
245 /* Parse option list */
246 foreach_node(DefElem, opt, stmt->params)
247 {
248 if (strcmp(opt->defname, "verbose") == 0)
249 params.options |= defGetBoolean(opt) ? CLUOPT_VERBOSE : 0;
250 else if (strcmp(opt->defname, "analyze") == 0 ||
251 strcmp(opt->defname, "analyse") == 0)
252 params.options |= defGetBoolean(opt) ? CLUOPT_ANALYZE : 0;
253 else if (strcmp(opt->defname, "concurrently") == 0 &&
254 defGetBoolean(opt))
255 {
256 if (stmt->command != REPACK_COMMAND_REPACK)
259 errmsg("CONCURRENTLY option not supported for %s",
260 RepackCommandAsString(stmt->command)));
261 params.options |= CLUOPT_CONCURRENT;
262 }
263 else
266 errmsg("unrecognized %s option \"%s\"",
267 RepackCommandAsString(stmt->command),
268 opt->defname),
269 parser_errposition(pstate, opt->location));
270 }
271
272 /* Determine the lock mode to use. */
273 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
274
275 if ((params.options & CLUOPT_CONCURRENT) != 0)
276 {
277 /*
278 * Make sure we're not in a transaction block.
279 *
280 * The reason is that repack_setup_logical_decoding() could wait
281 * indefinitely for our XID to complete. (The deadlock detector would
282 * not recognize it because we'd be waiting for ourselves, i.e. no
283 * real lock conflict.) It would be possible to run in a transaction
284 * block if we had no XID, but this restriction is simpler for users
285 * to understand and we don't lose any functionality.
286 */
287 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
288 }
289
290 /*
291 * If a single relation is specified, process it and we're done ... unless
292 * the relation is a partitioned table, in which case we fall through.
293 */
294 if (stmt->relation != NULL)
295 {
296 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
297 if (rel == NULL)
298 return; /* all done */
299 }
300
301 /*
302 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
303 * can add support for this later.
304 */
305 if (params.options & CLUOPT_ANALYZE)
308 errmsg("cannot execute %s on multiple tables",
309 "REPACK (ANALYZE)"));
310
311 /*
312 * By here, we know we are in a multi-table situation.
313 *
314 * Concurrent processing is currently considered rather special (e.g. in
315 * terms of resources consumed) so it is not performed in bulk.
316 */
317 if (params.options & CLUOPT_CONCURRENT)
318 {
319 if (rel != NULL)
320 {
321 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
324 errmsg("REPACK (CONCURRENTLY) is not supported for partitioned tables"),
325 errhint("Consider running the command on individual partitions."));
326 }
327 else
330 errmsg("REPACK (CONCURRENTLY) requires an explicit table name"));
331 }
332
333 /*
334 * In order to avoid holding locks for too long, we want to process each
335 * table in its own transaction. This forces us to disallow running
336 * inside a user transaction block.
337 */
339
340 /* Also, we need a memory context to hold our list of relations */
342 "Repack",
344
345 /*
346 * Since we open a new transaction for each relation, we have to check
347 * that the relation still is what we think it is.
348 *
349 * In single-transaction CLUSTER, we don't need the overhead.
350 */
351 params.options |= CLUOPT_RECHECK;
352
353 /*
354 * If we don't have a relation yet, determine a relation list. If we do,
355 * then it must be a partitioned table, and we want to process its
356 * partitions.
357 */
358 if (rel == NULL)
359 {
360 Assert(stmt->indexname == NULL);
361 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
364 }
365 else
366 {
367 Oid relid;
368 bool rel_is_index;
369
370 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
371
372 /*
373 * If USING INDEX was specified, resolve the index name now and pass
374 * it down.
375 */
376 if (stmt->usingindex)
377 {
378 /*
379 * If no index name was specified when repacking a partitioned
380 * table, punt for now. Maybe we can improve this later.
381 */
382 if (!stmt->indexname)
383 {
384 if (stmt->command == REPACK_COMMAND_CLUSTER)
387 errmsg("there is no previously clustered index for table \"%s\"",
389 else
392 /*- translator: first %s is name of a SQL command, eg. REPACK */
393 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
394 RepackCommandAsString(stmt->command),
396 }
397
398 relid = determine_clustered_index(rel, stmt->usingindex,
399 stmt->indexname);
400 if (!OidIsValid(relid))
401 elog(ERROR, "unable to determine index to cluster on");
403
404 rel_is_index = true;
405 }
406 else
407 {
408 relid = RelationGetRelid(rel);
409 rel_is_index = false;
410 }
411
413 relid, rel_is_index,
415
416 /* close parent relation, releasing lock on it */
418 rel = NULL;
419 }
420
421 /* Commit to get out of starting transaction */
424
425 /* Cluster the tables, each in a separate transaction */
426 Assert(rel == NULL);
428 {
429 /* Start a new transaction for each relation. */
431
432 /*
433 * Open the target table, coping with the case where it has been
434 * dropped.
435 */
436 rel = try_table_open(rtc->tableOid, lockmode);
437 if (rel == NULL)
438 {
440 continue;
441 }
442
443 /* functions in indexes may want a snapshot set */
445
446 /* Process this table */
447 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
448 /* cluster_rel closes the relation, but keeps lock */
449
452 }
453
454 /* Start a new transaction for the cleanup work. */
456
457 /* Clean up working storage */
459}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errhint(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:227
#define AccessExclusiveLock
Definition lockdefs.h:43
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
MemoryContext PortalContext
Definition mcxt.c:175
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
@ REPACK_COMMAND_REPACK
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2216
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2307
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:502
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2086
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2398
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207

References AccessExclusiveLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, check_index_is_clusterable(), CLUOPT_ANALYZE, CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_RECHECK_ISCLUSTERED, CLUOPT_VERBOSE, cluster_rel(), CommitTransactionCommand(), defGetBoolean(), determine_clustered_index(), elog, ereport, errcode(), errhint(), errmsg, ERROR, fb(), foreach_node, foreach_ptr, get_tables_to_repack(), get_tables_to_repack_partitioned(), GetTransactionSnapshot(), MemoryContextDelete(), OidIsValid, ClusterParams::options, parser_errposition(), PopActiveSnapshot(), PortalContext, PreventInTransactionBlock(), process_single_relation(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, REPACK_COMMAND_CLUSTER, REPACK_COMMAND_REPACK, RepackCommandAsString(), RepackLockLevel(), StartTransactionCommand(), stmt, table_close(), and try_table_open().

Referenced by standard_ProcessUtility().

◆ finish_heap_swap()

void finish_heap_swap ( Oid  OIDOldHeap,
Oid  OIDNewHeap,
bool  is_system_catalog,
bool  swap_toast_by_content,
bool  check_constraints,
bool  is_internal,
bool  reindex,
TransactionId  frozenXid,
MultiXactId  cutoffMulti,
char  newrelpersistence 
)
extern

Definition at line 1865 of file repack.c.

1874{
1875 ObjectAddress object;
1876 Oid mapped_tables[4];
1877 int i;
1878
1879 /* Report that we are now swapping relation files */
1882
1883 /* Zero out possible results from swapped_relation_files */
1884 memset(mapped_tables, 0, sizeof(mapped_tables));
1885
1886 /*
1887 * Swap the contents of the heap relations (including any toast tables).
1888 * Also set old heap's relfrozenxid to frozenXid.
1889 */
1892 swap_toast_by_content, is_internal,
1894
1895 /*
1896 * If it's a system catalog, queue a sinval message to flush all catcaches
1897 * on the catalog when we reach CommandCounterIncrement.
1898 */
1901
1902 if (reindex)
1903 {
1904 int reindex_flags;
1906
1907 /*
1908 * Rebuild each index on the relation (but not the toast table, which
1909 * is all-new at this point). It is important to do this before the
1910 * DROP step because if we are processing a system catalog that will
1911 * be used during DROP, we want to have its indexes available. There
1912 * is no advantage to the other order anyway because this is all
1913 * transactional, so no chance to reclaim disk space before commit. We
1914 * do not need a final CommandCounterIncrement() because
1915 * reindex_relation does it.
1916 *
1917 * Note: because index_build is called via reindex_relation, it will
1918 * never set indcheckxmin true for the indexes. This is OK even
1919 * though in some sense we are building new indexes rather than
1920 * rebuilding existing ones, because the new heap won't contain any
1921 * HOT chains at all, let alone broken ones, so it can't be necessary
1922 * to set indcheckxmin.
1923 */
1927
1928 /*
1929 * Ensure that the indexes have the same persistence as the parent
1930 * relation.
1931 */
1932 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1934 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1936
1937 /* Report that we are now reindexing relations */
1940
1942 }
1943
1944 /* Report that we are now doing clean up */
1947
1948 /*
1949 * If the relation being rebuilt is pg_class, swap_relation_files()
1950 * couldn't update pg_class's own pg_class entry (check comments in
1951 * swap_relation_files()), thus relfrozenxid was not updated. That's
1952 * annoying because a potential reason for doing a VACUUM FULL is a
1953 * imminent or actual anti-wraparound shutdown. So, now that we can
1954 * access the new relation using its indices, update relfrozenxid.
1955 * pg_class doesn't have a toast relation, so we don't need to update the
1956 * corresponding toast relation. Not that there's little point moving all
1957 * relfrozenxid updates here since swap_relation_files() needs to write to
1958 * pg_class for non-mapped relations anyway.
1959 */
1961 {
1965
1967
1970 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1972
1973 relform->relfrozenxid = frozenXid;
1974 relform->relminmxid = cutoffMulti;
1975
1977
1979 }
1980
1981 /* Destroy new heap with old filenumber */
1982 object.classId = RelationRelationId;
1983 object.objectId = OIDNewHeap;
1984 object.objectSubId = 0;
1985
1986 if (!reindex)
1987 {
1988 /*
1989 * Make sure the changes in pg_class are visible. This is especially
1990 * important if !swap_toast_by_content, so that the correct TOAST
1991 * relation is dropped. (reindex_relation() above did not help in this
1992 * case))
1993 */
1995 }
1996
1997 /*
1998 * The new relation is local to our transaction and we know nothing
1999 * depends on it, so DROP_RESTRICT should be OK.
2000 */
2002
2003 /* performDeletion does CommandCounterIncrement at end */
2004
2005 /*
2006 * Now we must remove any relation mapping entries that we set up for the
2007 * transient table, as well as its toast table and toast index if any. If
2008 * we fail to do this before commit, the relmapper will complain about new
2009 * permanent map entries being added post-bootstrap.
2010 */
2011 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2013
2014 /*
2015 * At this point, everything is kosher except that, if we did toast swap
2016 * by links, the toast table's name corresponds to the transient table.
2017 * The name is irrelevant to the backend because it's referenced by OID,
2018 * but users looking at the catalogs could be confused. Rename it to
2019 * prevent this problem.
2020 *
2021 * Note no lock required on the relation, because we already hold an
2022 * exclusive lock on it.
2023 */
2025 {
2027
2029 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2030 {
2031 Oid toastidx;
2033
2034 /* Get the associated valid index to be renamed */
2035 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2037
2038 /* rename the toast table ... */
2039 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2040 OIDOldHeap);
2041 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2042 NewToastName, true, false);
2043
2044 /* ... and its valid index too. */
2045 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2046 OIDOldHeap);
2047
2049 NewToastName, true, true);
2050
2051 /*
2052 * Reset the relrewrite for the toast. The command-counter
2053 * increment is required here as we are about to update the tuple
2054 * that is updated as part of RenameRelationInternal.
2055 */
2057 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2058 }
2060 }
2061
2062 /* if it's not a catalog table, clear any missing attribute settings */
2063 if (!is_system_catalog)
2064 {
2066
2070 }
2071}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
void RelationClearMissing(Relation rel)
Definition heap.c:1964
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1612
int i
Definition isn.c:77
#define RowExclusiveLock
Definition lockdefs.h:38
@ DROP_RESTRICT
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define snprintf
Definition port.h:260
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1483
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4394
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4301
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessExclusiveLock, CacheInvalidateCatalog(), CatalogTupleUpdate(), CommandCounterIncrement(), DROP_RESTRICT, elog, ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, i, NAMEDATALEN, NoLock, ObjectIdGetDatum(), OidIsValid, PERFORM_DELETION_INTERNAL, performDeletion(), pgstat_progress_update_param(), PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP, PROGRESS_REPACK_PHASE_REBUILD_INDEX, PROGRESS_REPACK_PHASE_SWAP_REL_FILES, REINDEX_REL_CHECK_CONSTRAINTS, REINDEX_REL_FORCE_INDEXES_PERMANENT, REINDEX_REL_FORCE_INDEXES_UNLOGGED, REINDEX_REL_SUPPRESS_INDEX_USE, reindex_relation(), relation_close(), RelationClearMissing(), RelationMapRemoveMapping(), RenameRelationInternal(), ResetRelRewrite(), RowExclusiveLock, SearchSysCacheCopy1, snprintf, swap_relation_files(), table_close(), table_open(), and toast_get_valid_index().

Referenced by ATRewriteTables(), rebuild_relation(), rebuild_relation_finish_concurrent(), and refresh_by_heap_swap().

◆ HandleRepackMessageInterrupt()

void HandleRepackMessageInterrupt ( void  )
extern

Definition at line 3508 of file repack.c.

3509{
3510 InterruptPending = true;
3511 RepackMessagePending = true;
3513}
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:63
void SetLatch(Latch *latch)
Definition latch.c:290
volatile sig_atomic_t RepackMessagePending
Definition repack.c:146

References InterruptPending, MyLatch, RepackMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ make_new_heap()

Oid make_new_heap ( Oid  OIDOldHeap,
Oid  NewTableSpace,
Oid  NewAccessMethod,
char  relpersistence,
LOCKMODE  lockmode 
)
extern

Definition at line 1108 of file repack.c.

1110{
1114 Oid toastid;
1116 HeapTuple tuple;
1117 Datum reloptions;
1118 bool isNull;
1120
1121 OldHeap = table_open(OIDOldHeap, lockmode);
1123
1124 /*
1125 * Note that the NewHeap will not receive any of the defaults or
1126 * constraints associated with the OldHeap; we don't need 'em, and there's
1127 * no reason to spend cycles inserting them into the catalogs only to
1128 * delete them.
1129 */
1130
1131 /*
1132 * But we do want to use reloptions of the old heap for new heap.
1133 */
1135 if (!HeapTupleIsValid(tuple))
1136 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1137 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1138 &isNull);
1139 if (isNull)
1140 reloptions = (Datum) 0;
1141
1142 if (relpersistence == RELPERSISTENCE_TEMP)
1144 else
1146
1147 /*
1148 * Create the new heap, using a temporary name in the same namespace as
1149 * the existing table. NOTE: there is some risk of collision with user
1150 * relnames. Working around this seems more trouble than it's worth; in
1151 * particular, we can't create the new heap in a different namespace from
1152 * the old, or we will have problems with the TEMP status of temp tables.
1153 *
1154 * Note: the new heap is not a shared relation, even if we are rebuilding
1155 * a shared rel. However, we do make the new heap mapped if the source is
1156 * mapped. This simplifies swap_relation_files, and is absolutely
1157 * necessary for rebuilding pg_class, for reasons explained there.
1158 */
1159 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1160
1164 InvalidOid,
1165 InvalidOid,
1166 InvalidOid,
1167 OldHeap->rd_rel->relowner,
1170 NIL,
1172 relpersistence,
1173 false,
1176 reloptions,
1177 false,
1178 true,
1179 true,
1180 OIDOldHeap,
1181 NULL);
1183
1184 ReleaseSysCache(tuple);
1185
1186 /*
1187 * Advance command counter so that the newly-created relation's catalog
1188 * tuples will be visible to table_open.
1189 */
1191
1192 /*
1193 * If necessary, create a TOAST table for the new relation.
1194 *
1195 * If the relation doesn't have a TOAST table already, we can't need one
1196 * for the new relation. The other way around is possible though: if some
1197 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1198 * no TOAST table is needed for the new table.
1199 *
1200 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1201 * that the TOAST table will be visible for insertion.
1202 */
1203 toastid = OldHeap->rd_rel->reltoastrelid;
1204 if (OidIsValid(toastid))
1205 {
1206 /* keep the existing toast table's reloptions, if any */
1208 if (!HeapTupleIsValid(tuple))
1209 elog(ERROR, "cache lookup failed for relation %u", toastid);
1210 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1211 &isNull);
1212 if (isNull)
1213 reloptions = (Datum) 0;
1214
1215 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1216
1217 ReleaseSysCache(tuple);
1218 }
1219
1221
1222 return OIDNewHeap;
1223}
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1122
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
#define NIL
Definition pg_list.h:68
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65

References Assert, CommandCounterIncrement(), elog, ERROR, fb(), heap_create_with_catalog(), HeapTupleIsValid, InvalidOid, LookupCreationNamespace(), NAMEDATALEN, NewHeapCreateToastTable(), NIL, NoLock, ObjectIdGetDatum(), OidIsValid, ONCOMMIT_NOOP, RelationGetDescr, RelationGetNamespace, RelationIsMapped, ReleaseSysCache(), SearchSysCache1(), snprintf, SysCacheGetAttr(), table_close(), and table_open().

Referenced by ATRewriteTables(), rebuild_relation(), and RefreshMatViewByOid().

◆ mark_index_clustered()

void mark_index_clustered ( Relation  rel,
Oid  indexOid,
bool  is_internal 
)
extern

Definition at line 811 of file repack.c.

812{
817
818 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
819
820 /*
821 * If the index is already marked clustered, no need to do anything.
822 */
823 if (OidIsValid(indexOid))
824 {
825 if (get_index_isclustered(indexOid))
826 return;
827 }
828
829 /*
830 * Check each index of the relation and set/clear the bit as needed.
831 */
833
834 foreach(index, RelationGetIndexList(rel))
835 {
837
841 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
843
844 /*
845 * Unset the bit if set. We know it's wrong because we checked this
846 * earlier.
847 */
848 if (indexForm->indisclustered)
849 {
850 indexForm->indisclustered = false;
852 }
853 else if (thisIndexOid == indexOid)
854 {
855 /* this was checked earlier, but let's be real sure */
856 if (!indexForm->indisvalid)
857 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
858 indexForm->indisclustered = true;
860 }
861
863 InvalidOid, is_internal);
864
866 }
867
869}
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3848
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
#define lfirst_oid(lc)
Definition pg_list.h:174
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4827

References Assert, CatalogTupleUpdate(), elog, ERROR, fb(), Form_pg_index, get_index_isclustered(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, InvalidOid, InvokeObjectPostAlterHookArg, lfirst_oid, ObjectIdGetDatum(), OidIsValid, RelationData::rd_rel, RelationGetIndexList(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ATExecClusterOn(), ATExecDropCluster(), and rebuild_relation().

◆ ProcessRepackMessages()

void ProcessRepackMessages ( void  )
extern

Definition at line 3519 of file repack.c.

3520{
3521 MemoryContext oldcontext;
3523
3524 /*
3525 * Nothing to do if we haven't launched the worker yet or have already
3526 * terminated it.
3527 */
3528 if (decoding_worker == NULL)
3529 return;
3530
3531 /*
3532 * This is invoked from ProcessInterrupts(), and since some of the
3533 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3534 * for recursive calls if more signals are received while this runs. It's
3535 * unclear that recursive entry would be safe, and it doesn't seem useful
3536 * even if it is safe, so let's block interrupts until done.
3537 */
3539
3540 /*
3541 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3542 * don't want to risk leaking data into long-lived contexts, so let's do
3543 * our work here in a private context that we can reset on each use.
3544 */
3545 if (hpm_context == NULL) /* first time through? */
3547 "ProcessRepackMessages",
3549 else
3551
3552 oldcontext = MemoryContextSwitchTo(hpm_context);
3553
3554 /* OK to process messages. Reset the flag saying there are more to do. */
3555 RepackMessagePending = false;
3556
3557 /*
3558 * Read as many messages as we can from the worker, but stop when no more
3559 * messages can be read from the worker without blocking.
3560 */
3561 while (true)
3562 {
3563 shm_mq_result res;
3564 Size nbytes;
3565 void *data;
3566
3568 &data, true);
3569 if (res == SHM_MQ_WOULD_BLOCK)
3570 break;
3571 else if (res == SHM_MQ_SUCCESS)
3572 {
3573 StringInfoData msg;
3574
3575 initStringInfo(&msg);
3576 appendBinaryStringInfo(&msg, data, nbytes);
3578 pfree(msg.data);
3579 }
3580 else
3581 {
3582 /*
3583 * The decoding worker is special in that it exits as soon as it
3584 * has its work done. Thus the DETACHED result code is fine.
3585 */
3586 Assert(res == SHM_MQ_DETACHED);
3587
3588 break;
3589 }
3590 }
3591
3592 MemoryContextSwitchTo(oldcontext);
3593
3594 /* Might as well clear the context on our way out */
3596
3598}
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const void * data
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3604
static DecodingWorker * decoding_worker
Definition repack.c:140
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mqh
Definition repack.c:136

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), Assert, StringInfoData::data, data, decoding_worker, DecodingWorker::error_mqh, fb(), HOLD_INTERRUPTS, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), pfree(), ProcessRepackMessage(), RepackMessagePending, RESUME_INTERRUPTS, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)
extern

Definition at line 57 of file repack_worker.c.

58{
59 dsm_segment *seg;
61 shm_mq *mq;
64 SharedFileSet *sfs;
65 Snapshot snapshot;
66
67 am_repack_worker = true;
68
69 /*
70 * Override the default bgworker_die() with die() so we can use
71 * CHECK_FOR_INTERRUPTS().
72 */
75
77 if (seg == NULL)
80 errmsg("could not map dynamic shared memory segment"));
81
83 shared->dsm_seg = seg;
84
85 /* Arrange to signal the leader if we exit. */
87
88 /*
89 * Join locking group - see the comments around the call of
90 * start_repack_decoding_worker().
91 */
92 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
93 return; /* The leader is not running anymore. */
94
95 /*
96 * Setup a queue to send error messages to the backend that launched this
97 * worker.
98 */
99 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
101 mqh = shm_mq_attach(mq, seg, NULL);
104 shared->backend_proc_number);
105
106 /* Connect to the database. */
108
109 /*
110 * Transaction is needed to open relation, and it also provides us with a
111 * resource owner.
112 */
114
116
117 /*
118 * Not sure the spinlock is needed here - the backend should not change
119 * anything in the shared memory until we have serialized the snapshot.
120 */
121 SpinLockAcquire(&shared->mutex);
123 sfs = &shared->sfs;
124 SpinLockRelease(&shared->mutex);
125
126 SharedFileSetAttach(sfs, seg);
127
128 /*
129 * Prepare to capture the concurrent data changes ourselves.
130 */
132
133 /* Announce that we're ready. */
134 SpinLockAcquire(&shared->mutex);
135 shared->initialized = true;
136 SpinLockRelease(&shared->mutex);
137 ConditionVariableSignal(&shared->cv);
138
139 /* There doesn't seem to a nice API to set these */
141 XactReadOnly = true;
142
143 /* Build the initial snapshot and export it. */
144 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
145 export_initial_snapshot(snapshot, shared);
146
147 /*
148 * Only historic snapshots should be used now. Do not let us restrict the
149 * progress of xmin horizon.
150 */
152
153 for (;;)
154 {
155 bool stop = decode_concurrent_changes(decoding_ctx, shared);
156
157 if (stop)
158 break;
159
160 }
161
162 /* Cleanup. */
165}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BUFFERALIGN(LEN)
Definition c.h:898
void ConditionVariableSignal(ConditionVariable *cv)
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
#define die(msg)
#define pqsignal
Definition port.h:547
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:443
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2072
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), die, dsm_attach(), DecodingWorkerShared::dsm_seg, dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

Variable Documentation

◆ RepackMessagePending

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
extern