PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack.h File Reference
#include <signal.h>
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
Include dependency graph for repack.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ClusterParams
 

Macros

#define CLUOPT_VERBOSE   0x01 /* print progress info */
 
#define CLUOPT_RECHECK   0x02 /* recheck relation state */
 
#define CLUOPT_RECHECK_ISCLUSTERED
 
#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */
 
#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */
 

Typedefs

typedef struct ClusterParams ClusterParams
 

Functions

void ExecRepack (ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 
void cluster_rel (RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
 
void check_index_is_clusterable (Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
 
void mark_index_clustered (Relation rel, Oid indexOid, bool is_internal)
 
Oid make_new_heap (Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
 
void finish_heap_swap (Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
 
void HandleRepackMessageInterrupt (void)
 
void ProcessRepackMessages (void)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 

Variables

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
 

Macro Definition Documentation

◆ CLUOPT_ANALYZE

#define CLUOPT_ANALYZE   0x08 /* do an ANALYZE */

Definition at line 28 of file repack.h.

◆ CLUOPT_CONCURRENT

#define CLUOPT_CONCURRENT   0x10 /* allow concurrent data changes */

Definition at line 29 of file repack.h.

◆ CLUOPT_RECHECK

#define CLUOPT_RECHECK   0x02 /* recheck relation state */

Definition at line 26 of file repack.h.

◆ CLUOPT_RECHECK_ISCLUSTERED

#define CLUOPT_RECHECK_ISCLUSTERED
Value:
0x04 /* recheck relation state for
* indisclustered */

Definition at line 27 of file repack.h.

34{
35 uint32 options; /* bitmask of CLUOPT_* */
37
39
40
41extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
42
43extern void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
44 ClusterParams *params, bool isTopLevel);
46 LOCKMODE lockmode);
47extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
48
50 char relpersistence, LOCKMODE lockmode);
55 bool is_internal,
56 bool reindex,
59 char newrelpersistence);
60
61extern void HandleRepackMessageInterrupt(void);
62extern void ProcessRepackMessages(void);
63
64/* in repack_worker.c */
66extern bool AmRepackWorker(void);
67
68#endif /* REPACK_H */
#define PGDLLIMPORT
Definition c.h:1421
TransactionId MultiXactId
Definition c.h:746
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define stmt
int LOCKMODE
Definition lockdefs.h:26
RepackCommand
uint64_t Datum
Definition postgres.h:70
unsigned int Oid
static int fb(int x)
bool AmRepackWorker(void)
void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, bool check_constraints, bool is_internal, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence)
Definition repack.c:1882
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:767
void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
Definition repack.c:244
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:518
Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode)
Definition repack.c:1125
void HandleRepackMessageInterrupt(void)
Definition repack.c:3612
void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
Definition repack.c:827
void RepackWorkerMain(Datum main_arg)
PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
Definition repack.c:150
void ProcessRepackMessages(void)
Definition repack.c:3623

◆ CLUOPT_VERBOSE

#define CLUOPT_VERBOSE   0x01 /* print progress info */

Definition at line 25 of file repack.h.

Typedef Documentation

◆ ClusterParams

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )
extern

Definition at line 187 of file repack_worker.c.

188{
189 return am_repack_worker;
190}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ check_index_is_clusterable()

void check_index_is_clusterable ( Relation  OldHeap,
Oid  indexOid,
LOCKMODE  lockmode 
)
extern

Definition at line 767 of file repack.c.

768{
770
771 OldIndex = index_open(indexOid, lockmode);
772
773 /*
774 * Check that index is in fact an index on the given relation
775 */
776 if (OldIndex->rd_index == NULL ||
777 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
780 errmsg("\"%s\" is not an index for table \"%s\"",
783
784 /* Index AM must allow clustering */
785 if (!OldIndex->rd_indam->amclusterable)
788 errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
790
791 /*
792 * Disallow clustering on incomplete indexes (those that might not index
793 * every row of the relation). We could relax this by making a separate
794 * seqscan pass over the table to copy the missing rows, but that seems
795 * expensive and tedious.
796 */
797 if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred, NULL))
800 errmsg("cannot cluster on partial index \"%s\"",
802
803 /*
804 * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
805 * it might well not contain entries for every heap row, or might not even
806 * be internally consistent. (But note that we don't check indcheckxmin;
807 * the worst consequence of following broken HOT chains would be that we
808 * might put recently-dead tuples out-of-order in the new table, and there
809 * is little harm in that.)
810 */
811 if (!OldIndex->rd_index->indisvalid)
814 errmsg("cannot cluster on invalid index \"%s\"",
816
817 /* Drop relcache refcnt on OldIndex, but keep lock */
819}
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
#define NoLock
Definition lockdefs.h:34
static char * errmsg
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetRelationName(relation)
Definition rel.h:550

References ereport, errcode(), errmsg, ERROR, fb(), heap_attisnull(), index_close(), index_open(), NoLock, RelationGetRelationName, and RelationGetRelid.

Referenced by ATExecClusterOn(), cluster_rel(), ExecRepack(), and process_single_relation().

◆ cluster_rel()

void cluster_rel ( RepackCommand  cmd,
Relation  OldHeap,
Oid  indexOid,
ClusterParams params,
bool  isTopLevel 
)
extern

Definition at line 518 of file repack.c.

520{
521 Oid tableOid = RelationGetRelid(OldHeap);
524 Oid save_userid;
525 int save_sec_context;
526 int save_nestlevel;
527 bool verbose = ((params->options & CLUOPT_VERBOSE) != 0);
528 bool recheck = ((params->options & CLUOPT_RECHECK) != 0);
529 bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
531
532 /* Determine the lock mode to use. */
533 lmode = RepackLockLevel(concurrent);
534
535 /*
536 * Check some preconditions in the concurrent case. This also obtains the
537 * replica index OID.
538 */
539 if (concurrent)
541
542 /* Check for user-requested abort. */
544
547
548 /*
549 * Switch to the table owner's userid, so that any index functions are run
550 * as that user. Also lock down security-restricted operations and
551 * arrange to make GUC variable changes local to this command.
552 */
553 GetUserIdAndSecContext(&save_userid, &save_sec_context);
554 SetUserIdAndSecContext(OldHeap->rd_rel->relowner,
555 save_sec_context | SECURITY_RESTRICTED_OPERATION);
556 save_nestlevel = NewGUCNestLevel();
558
559 /*
560 * Recheck that the relation is still what it was when we started.
561 *
562 * Note that it's critical to skip this in single-relation CLUSTER;
563 * otherwise, we would reject an attempt to cluster using a
564 * not-previously-clustered index.
565 */
566 if (recheck &&
567 !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
568 lmode, params->options))
569 goto out;
570
571 /*
572 * We allow repacking shared catalogs only when not using an index. It
573 * would work to use an index in most respects, but the index would only
574 * get marked as indisclustered in the current database, leading to
575 * unexpected behavior if CLUSTER were later invoked in another database.
576 */
577 if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
580 /*- translator: first %s is name of a SQL command, eg. REPACK */
581 errmsg("cannot execute %s on a shared catalog",
583
584 /*
585 * The CONCURRENTLY case should have been rejected earlier because it does
586 * not support system catalogs.
587 */
588 Assert(!(OldHeap->rd_rel->relisshared && concurrent));
589
590 /*
591 * Don't process temp tables of other backends ... their local buffer
592 * manager is not going to cope.
593 */
597 /*- translator: first %s is name of a SQL command, eg. REPACK */
598 errmsg("cannot execute %s on temporary tables of other sessions",
600
601 /*
602 * Also check for active uses of the relation in the current transaction,
603 * including open scans and pending AFTER trigger events.
604 */
606
607 /* Check heap and index are valid to cluster on */
608 if (OidIsValid(indexOid))
609 {
610 /* verify the index is good and lock it */
612 /* also open it */
613 index = index_open(indexOid, NoLock);
614 }
615 else
616 index = NULL;
617
618 /*
619 * When allow_system_table_mods is turned off, we disallow repacking a
620 * catalog on a particular index unless that's already the clustered index
621 * for that catalog.
622 *
623 * XXX We don't check for this in CLUSTER, because it's historically been
624 * allowed.
625 */
626 if (cmd != REPACK_COMMAND_CLUSTER &&
627 !allowSystemTableMods && OidIsValid(indexOid) &&
628 IsCatalogRelation(OldHeap) && !index->rd_index->indisclustered)
631 errmsg("permission denied: \"%s\" is a system catalog",
633 errdetail("System catalogs can only be clustered by the index they're already clustered on, if any, unless \"%s\" is enabled.",
634 "allow_system_table_mods"));
635
636 /*
637 * Quietly ignore the request if this is a materialized view which has not
638 * been populated from its query. No harm is done because there is no data
639 * to deal with, and we don't want to throw an error if this is part of a
640 * multi-relation request -- for example, CLUSTER was run on the entire
641 * database.
642 */
643 if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
645 {
646 if (index)
649 goto out;
650 }
651
652 Assert(OldHeap->rd_rel->relkind == RELKIND_RELATION ||
653 OldHeap->rd_rel->relkind == RELKIND_MATVIEW ||
654 OldHeap->rd_rel->relkind == RELKIND_TOASTVALUE);
655
656 /*
657 * All predicate locks on the tuples or pages are about to be made
658 * invalid, because we move tuples around. Promote them to relation
659 * locks. Predicate locks on indexes will be promoted when they are
660 * reindexed.
661 *
662 * During concurrent processing, the heap as well as its indexes stay in
663 * operation, so we postpone this step until they are locked using
664 * AccessExclusiveLock near the end of the processing.
665 */
666 if (!concurrent)
668
669 /* rebuild_relation does all the dirty work */
670 PG_TRY();
671 {
673 }
674 PG_FINALLY();
675 {
676 if (concurrent)
677 {
678 /*
679 * Since during normal operation the worker was already asked to
680 * exit, stopping it explicitly is especially important on ERROR.
681 * However it still seems a good practice to make sure that the
682 * worker never survives the REPACK command.
683 */
685 }
686 }
687 PG_END_TRY();
688
689 /* rebuild_relation closes OldHeap, and index if valid */
690
691out:
692 /* Roll back any GUC changes executed by index functions */
693 AtEOXact_GUC(false, save_nestlevel);
694
695 /* Restore userid and security context */
696 SetUserIdAndSecContext(save_userid, save_sec_context);
697
699}
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_end_command(void)
@ PROGRESS_COMMAND_REPACK
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_FINALLY(...)
Definition elog.h:391
bool allowSystemTableMods
Definition globals.c:132
int NewGUCNestLevel(void)
Definition guc.c:2142
void RestrictSearchPath(void)
Definition guc.c:2153
void AtEOXact_GUC(bool isCommit, int nestLevel)
Definition guc.c:2169
#define SECURITY_RESTRICTED_OPERATION
Definition miscadmin.h:331
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Definition miscinit.c:613
void SetUserIdAndSecContext(Oid userid, int sec_context)
Definition miscinit.c:620
@ REPACK_COMMAND_CLUSTER
static int verbose
#define InvalidOid
void TransferPredicateLocksToHeapRelation(Relation relation)
Definition predicate.c:3052
#define PROGRESS_REPACK_COMMAND
Definition progress.h:85
#define RelationIsPopulated(relation)
Definition rel.h:697
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:678
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options)
Definition repack.c:706
static void check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p)
Definition repack.c:893
void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode)
Definition repack.c:767
static void stop_repack_decoding_worker(void)
Definition repack.c:3499
static LOCKMODE RepackLockLevel(bool concurrent)
Definition repack.c:485
static const char * RepackCommandAsString(RepackCommand cmd)
Definition repack.c:2481
static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx)
Definition repack.c:978
#define CLUOPT_VERBOSE
Definition repack.h:25
#define CLUOPT_CONCURRENT
Definition repack.h:29
#define CLUOPT_RECHECK
Definition repack.h:26
void relation_close(Relation relation, LOCKMODE lockmode)
Definition relation.c:206
uint32 options
Definition repack.h:34
Definition type.h:97
void CheckTableNotInUse(Relation rel, const char *stmt)
Definition tablecmds.c:4473

References allowSystemTableMods, Assert, AtEOXact_GUC(), check_concurrent_repack_requirements(), CHECK_FOR_INTERRUPTS, check_index_is_clusterable(), CheckTableNotInUse(), CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_VERBOSE, cluster_rel_recheck(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserIdAndSecContext(), index_close(), index_open(), InvalidOid, IsCatalogRelation(), NewGUCNestLevel(), NoLock, OidIsValid, ClusterParams::options, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_progress_end_command(), pgstat_progress_start_command(), pgstat_progress_update_param(), PROGRESS_COMMAND_REPACK, PROGRESS_REPACK_COMMAND, rebuild_relation(), relation_close(), RELATION_IS_OTHER_TEMP, RelationGetRelationName, RelationGetRelid, RelationIsPopulated, REPACK_COMMAND_CLUSTER, RepackCommandAsString(), RepackLockLevel(), RestrictSearchPath(), SECURITY_RESTRICTED_OPERATION, SetUserIdAndSecContext(), stop_repack_decoding_worker(), TransferPredicateLocksToHeapRelation(), and verbose.

Referenced by ExecRepack(), process_single_relation(), and vacuum_rel().

◆ ExecRepack()

void ExecRepack ( ParseState pstate,
RepackStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 244 of file repack.c.

245{
246 ClusterParams params = {0};
247 Relation rel = NULL;
249 LOCKMODE lockmode;
250 List *rtcs;
251 bool verbose = false;
252 bool analyze = false;
253 bool concurrently = false;
254
255 /* Parse option list */
256 foreach_node(DefElem, opt, stmt->params)
257 {
258 if (strcmp(opt->defname, "verbose") == 0)
259 verbose = defGetBoolean(opt);
260 else if (strcmp(opt->defname, "analyze") == 0 ||
261 strcmp(opt->defname, "analyse") == 0)
262 analyze = defGetBoolean(opt);
263 else if (strcmp(opt->defname, "concurrently") == 0)
264 {
265 if (stmt->command != REPACK_COMMAND_REPACK)
268 errmsg("CONCURRENTLY option not supported for %s",
269 RepackCommandAsString(stmt->command)));
271 }
272 else
275 errmsg("unrecognized %s option \"%s\"",
276 RepackCommandAsString(stmt->command),
277 opt->defname),
278 parser_errposition(pstate, opt->location));
279 }
280
281 params.options |=
282 (verbose ? CLUOPT_VERBOSE : 0) |
283 (analyze ? CLUOPT_ANALYZE : 0) |
285
286 /* Determine the lock mode to use. */
287 lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
288
289 if ((params.options & CLUOPT_CONCURRENT) != 0)
290 {
291 /*
292 * Make sure we're not in a transaction block.
293 *
294 * The reason is that repack_setup_logical_decoding() could wait
295 * indefinitely for our XID to complete. (The deadlock detector would
296 * not recognize it because we'd be waiting for ourselves, i.e. no
297 * real lock conflict.) It would be possible to run in a transaction
298 * block if we had no XID, but this restriction is simpler for users
299 * to understand and we don't lose any functionality.
300 */
301 PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
302 }
303
304 /*
305 * If a single relation is specified, process it and we're done ... unless
306 * the relation is a partitioned table, in which case we fall through.
307 */
308 if (stmt->relation != NULL)
309 {
310 rel = process_single_relation(stmt, lockmode, isTopLevel, &params);
311 if (rel == NULL)
312 return; /* all done */
313 }
314
315 /*
316 * Don't allow ANALYZE in the multiple-relation case for now. Maybe we
317 * can add support for this later.
318 */
319 if (params.options & CLUOPT_ANALYZE)
322 errmsg("cannot execute %s on multiple tables",
323 "REPACK (ANALYZE)"));
324
325 /*
326 * By here, we know we are in a multi-table situation.
327 *
328 * Concurrent processing is currently considered rather special (e.g. in
329 * terms of resources consumed) so it is not performed in bulk.
330 */
331 if (params.options & CLUOPT_CONCURRENT)
332 {
333 if (rel != NULL)
334 {
335 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
338 errmsg("%s is not supported for partitioned tables",
339 "REPACK (CONCURRENTLY)"),
340 errhint("Consider running the command on individual partitions."));
341 }
342 else
345 errmsg("%s requires an explicit table name",
346 "REPACK (CONCURRENTLY)"));
347 }
348
349 /*
350 * In order to avoid holding locks for too long, we want to process each
351 * table in its own transaction. This forces us to disallow running
352 * inside a user transaction block.
353 */
355
356 /* Also, we need a memory context to hold our list of relations */
358 "Repack",
360
361 /*
362 * Since we open a new transaction for each relation, we have to check
363 * that the relation still is what we think it is.
364 *
365 * In single-transaction CLUSTER, we don't need the overhead.
366 */
367 params.options |= CLUOPT_RECHECK;
368
369 /*
370 * If we don't have a relation yet, determine a relation list. If we do,
371 * then it must be a partitioned table, and we want to process its
372 * partitions.
373 */
374 if (rel == NULL)
375 {
376 Assert(stmt->indexname == NULL);
377 rtcs = get_tables_to_repack(stmt->command, stmt->usingindex,
380 }
381 else
382 {
383 Oid relid;
384 bool rel_is_index;
385
386 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
387
388 /*
389 * If USING INDEX was specified, resolve the index name now and pass
390 * it down.
391 */
392 if (stmt->usingindex)
393 {
394 /*
395 * If no index name was specified when repacking a partitioned
396 * table, punt for now. Maybe we can improve this later.
397 */
398 if (!stmt->indexname)
399 {
400 if (stmt->command == REPACK_COMMAND_CLUSTER)
403 errmsg("there is no previously clustered index for table \"%s\"",
405 else
408 /*- translator: first %s is name of a SQL command, eg. REPACK */
409 errmsg("cannot execute %s on partitioned table \"%s\" USING INDEX with no index name",
410 RepackCommandAsString(stmt->command),
412 }
413
414 relid = determine_clustered_index(rel, stmt->usingindex,
415 stmt->indexname);
416 if (!OidIsValid(relid))
417 elog(ERROR, "unable to determine index to cluster on");
419
420 rel_is_index = true;
421 }
422 else
423 {
424 relid = RelationGetRelid(rel);
425 rel_is_index = false;
426 }
427
429 relid, rel_is_index,
431
432 /* close parent relation, releasing lock on it */
434 rel = NULL;
435 }
436
437 /* Commit to get out of starting transaction */
440
441 /* Cluster the tables, each in a separate transaction */
442 Assert(rel == NULL);
444 {
445 /* Start a new transaction for each relation. */
447
448 /*
449 * Open the target table, coping with the case where it has been
450 * dropped.
451 */
452 rel = try_table_open(rtc->tableOid, lockmode);
453 if (rel == NULL)
454 {
456 continue;
457 }
458
459 /* functions in indexes may want a snapshot set */
461
462 /* Process this table */
463 cluster_rel(stmt->command, rel, rtc->indexOid, &params, isTopLevel);
464 /* cluster_rel closes the relation, but keeps lock */
465
468 }
469
470 /* Start a new transaction for the cleanup work. */
472
473 /* Clean up working storage */
475}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errhint(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:228
#define AccessExclusiveLock
Definition lockdefs.h:43
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
MemoryContext PortalContext
Definition mcxt.c:176
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
@ REPACK_COMMAND_REPACK
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define foreach_node(type, var, lst)
Definition pg_list.h:528
static long analyze(struct nfa *nfa)
Definition regc_nfa.c:3051
static List * get_tables_to_repack_partitioned(RepackCommand cmd, Oid relid, bool rel_is_index, MemoryContext permcxt)
Definition repack.c:2256
static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, ClusterParams *params)
Definition repack.c:2347
void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, ClusterParams *params, bool isTopLevel)
Definition repack.c:518
static List * get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt)
Definition repack.c:2103
static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
Definition repack.c:2438
#define CLUOPT_ANALYZE
Definition repack.h:28
#define CLUOPT_RECHECK_ISCLUSTERED
Definition repack.h:27
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Definition pg_list.h:54
Form_pg_class rd_rel
Definition rel.h:111
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207

References AccessExclusiveLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, analyze(), Assert, check_index_is_clusterable(), CLUOPT_ANALYZE, CLUOPT_CONCURRENT, CLUOPT_RECHECK, CLUOPT_RECHECK_ISCLUSTERED, CLUOPT_VERBOSE, cluster_rel(), CommitTransactionCommand(), defGetBoolean(), determine_clustered_index(), elog, ereport, errcode(), errhint(), errmsg, ERROR, fb(), foreach_node, foreach_ptr, get_tables_to_repack(), get_tables_to_repack_partitioned(), GetTransactionSnapshot(), MemoryContextDelete(), OidIsValid, ClusterParams::options, parser_errposition(), PopActiveSnapshot(), PortalContext, PreventInTransactionBlock(), process_single_relation(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, REPACK_COMMAND_CLUSTER, REPACK_COMMAND_REPACK, RepackCommandAsString(), RepackLockLevel(), StartTransactionCommand(), stmt, table_close(), try_table_open(), and verbose.

Referenced by standard_ProcessUtility().

◆ finish_heap_swap()

void finish_heap_swap ( Oid  OIDOldHeap,
Oid  OIDNewHeap,
bool  is_system_catalog,
bool  swap_toast_by_content,
bool  check_constraints,
bool  is_internal,
bool  reindex,
TransactionId  frozenXid,
MultiXactId  cutoffMulti,
char  newrelpersistence 
)
extern

Definition at line 1882 of file repack.c.

1891{
1892 ObjectAddress object;
1893 Oid mapped_tables[4];
1894 int i;
1895
1896 /* Report that we are now swapping relation files */
1899
1900 /* Zero out possible results from swapped_relation_files */
1901 memset(mapped_tables, 0, sizeof(mapped_tables));
1902
1903 /*
1904 * Swap the contents of the heap relations (including any toast tables).
1905 * Also set old heap's relfrozenxid to frozenXid.
1906 */
1909 swap_toast_by_content, is_internal,
1911
1912 /*
1913 * If it's a system catalog, queue a sinval message to flush all catcaches
1914 * on the catalog when we reach CommandCounterIncrement.
1915 */
1918
1919 if (reindex)
1920 {
1921 int reindex_flags;
1923
1924 /*
1925 * Rebuild each index on the relation (but not the toast table, which
1926 * is all-new at this point). It is important to do this before the
1927 * DROP step because if we are processing a system catalog that will
1928 * be used during DROP, we want to have its indexes available. There
1929 * is no advantage to the other order anyway because this is all
1930 * transactional, so no chance to reclaim disk space before commit. We
1931 * do not need a final CommandCounterIncrement() because
1932 * reindex_relation does it.
1933 *
1934 * Note: because index_build is called via reindex_relation, it will
1935 * never set indcheckxmin true for the indexes. This is OK even
1936 * though in some sense we are building new indexes rather than
1937 * rebuilding existing ones, because the new heap won't contain any
1938 * HOT chains at all, let alone broken ones, so it can't be necessary
1939 * to set indcheckxmin.
1940 */
1944
1945 /*
1946 * Ensure that the indexes have the same persistence as the parent
1947 * relation.
1948 */
1949 if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
1951 else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
1953
1954 /* Report that we are now reindexing relations */
1957
1959 }
1960
1961 /* Report that we are now doing clean up */
1964
1965 /*
1966 * If the relation being rebuilt is pg_class, swap_relation_files()
1967 * couldn't update pg_class's own pg_class entry (check comments in
1968 * swap_relation_files()), thus relfrozenxid was not updated. That's
1969 * annoying because a potential reason for doing a VACUUM FULL is a
1970 * imminent or actual anti-wraparound shutdown. So, now that we can
1971 * access the new relation using its indices, update relfrozenxid.
1972 * pg_class doesn't have a toast relation, so we don't need to update the
1973 * corresponding toast relation. Not that there's little point moving all
1974 * relfrozenxid updates here since swap_relation_files() needs to write to
1975 * pg_class for non-mapped relations anyway.
1976 */
1978 {
1982
1984
1987 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1989
1990 relform->relfrozenxid = frozenXid;
1991 relform->relminmxid = cutoffMulti;
1992
1994
1996 }
1997
1998 /* Destroy new heap with old filenumber */
1999 object.classId = RelationRelationId;
2000 object.objectId = OIDNewHeap;
2001 object.objectSubId = 0;
2002
2003 if (!reindex)
2004 {
2005 /*
2006 * Make sure the changes in pg_class are visible. This is especially
2007 * important if !swap_toast_by_content, so that the correct TOAST
2008 * relation is dropped. (reindex_relation() above did not help in this
2009 * case))
2010 */
2012 }
2013
2014 /*
2015 * The new relation is local to our transaction and we know nothing
2016 * depends on it, so DROP_RESTRICT should be OK.
2017 */
2019
2020 /* performDeletion does CommandCounterIncrement at end */
2021
2022 /*
2023 * Now we must remove any relation mapping entries that we set up for the
2024 * transient table, as well as its toast table and toast index if any. If
2025 * we fail to do this before commit, the relmapper will complain about new
2026 * permanent map entries being added post-bootstrap.
2027 */
2028 for (i = 0; OidIsValid(mapped_tables[i]); i++)
2030
2031 /*
2032 * At this point, everything is kosher except that, if we did toast swap
2033 * by links, the toast table's name corresponds to the transient table.
2034 * The name is irrelevant to the backend because it's referenced by OID,
2035 * but users looking at the catalogs could be confused. Rename it to
2036 * prevent this problem.
2037 *
2038 * Note no lock required on the relation, because we already hold an
2039 * exclusive lock on it.
2040 */
2042 {
2044
2046 if (OidIsValid(newrel->rd_rel->reltoastrelid))
2047 {
2048 Oid toastidx;
2050
2051 /* Get the associated valid index to be renamed */
2052 toastidx = toast_get_valid_index(newrel->rd_rel->reltoastrelid,
2054
2055 /* rename the toast table ... */
2056 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
2057 OIDOldHeap);
2058 RenameRelationInternal(newrel->rd_rel->reltoastrelid,
2059 NewToastName, true, false);
2060
2061 /* ... and its valid index too. */
2062 snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
2063 OIDOldHeap);
2064
2066 NewToastName, true, true);
2067
2068 /*
2069 * Reset the relrewrite for the toast. The command-counter
2070 * increment is required here as we are about to update the tuple
2071 * that is updated as part of RenameRelationInternal.
2072 */
2074 ResetRelRewrite(newrel->rd_rel->reltoastrelid);
2075 }
2077 }
2078
2079 /* if it's not a catalog table, clear any missing attribute settings */
2080 if (!is_system_catalog)
2081 {
2083
2087 }
2088}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
void RelationClearMissing(Relation rel)
Definition heap.c:1981
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
bool reindex_relation(const ReindexStmt *stmt, Oid relid, int flags, const ReindexParams *params)
Definition index.c:3969
#define REINDEX_REL_FORCE_INDEXES_UNLOGGED
Definition index.h:169
#define REINDEX_REL_SUPPRESS_INDEX_USE
Definition index.h:167
#define REINDEX_REL_FORCE_INDEXES_PERMANENT
Definition index.h:170
#define REINDEX_REL_CHECK_CONSTRAINTS
Definition index.h:168
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CacheInvalidateCatalog(Oid catalogId)
Definition inval.c:1609
int i
Definition isn.c:77
#define RowExclusiveLock
Definition lockdefs.h:38
@ DROP_RESTRICT
FormData_pg_class * Form_pg_class
Definition pg_class.h:160
#define NAMEDATALEN
#define snprintf
Definition port.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
#define PROGRESS_REPACK_PHASE
Definition progress.h:86
#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES
Definition progress.h:104
#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP
Definition progress.h:106
#define PROGRESS_REPACK_PHASE_REBUILD_INDEX
Definition progress.h:105
void RelationMapRemoveMapping(Oid relationId)
Definition relmapper.c:439
static void swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class, bool swap_toast_by_content, bool is_internal, TransactionId frozenXid, MultiXactId cutoffMulti, Oid *mapped_tables)
Definition repack.c:1500
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ResetRelRewrite(Oid myrelid)
Definition tablecmds.c:4420
void RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bool is_index)
Definition tablecmds.c:4327
Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock)
void CommandCounterIncrement(void)
Definition xact.c:1130

References AccessExclusiveLock, CacheInvalidateCatalog(), CatalogTupleUpdate(), CommandCounterIncrement(), DROP_RESTRICT, elog, ERROR, fb(), GETSTRUCT(), HeapTupleIsValid, i, NAMEDATALEN, NoLock, ObjectIdGetDatum(), OidIsValid, PERFORM_DELETION_INTERNAL, performDeletion(), pgstat_progress_update_param(), PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP, PROGRESS_REPACK_PHASE_REBUILD_INDEX, PROGRESS_REPACK_PHASE_SWAP_REL_FILES, REINDEX_REL_CHECK_CONSTRAINTS, REINDEX_REL_FORCE_INDEXES_PERMANENT, REINDEX_REL_FORCE_INDEXES_UNLOGGED, REINDEX_REL_SUPPRESS_INDEX_USE, reindex_relation(), relation_close(), RelationClearMissing(), RelationMapRemoveMapping(), RenameRelationInternal(), ResetRelRewrite(), RowExclusiveLock, SearchSysCacheCopy1, snprintf, swap_relation_files(), table_close(), table_open(), and toast_get_valid_index().

Referenced by ATRewriteTables(), rebuild_relation(), rebuild_relation_finish_concurrent(), and refresh_by_heap_swap().

◆ HandleRepackMessageInterrupt()

void HandleRepackMessageInterrupt ( void  )
extern

Definition at line 3612 of file repack.c.

3613{
3614 InterruptPending = true;
3615 RepackMessagePending = true;
3617}
volatile sig_atomic_t InterruptPending
Definition globals.c:32
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
volatile sig_atomic_t RepackMessagePending
Definition repack.c:150

References InterruptPending, MyLatch, RepackMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ make_new_heap()

Oid make_new_heap ( Oid  OIDOldHeap,
Oid  NewTableSpace,
Oid  NewAccessMethod,
char  relpersistence,
LOCKMODE  lockmode 
)
extern

Definition at line 1125 of file repack.c.

1127{
1131 Oid toastid;
1133 HeapTuple tuple;
1134 Datum reloptions;
1135 bool isNull;
1137
1138 OldHeap = table_open(OIDOldHeap, lockmode);
1140
1141 /*
1142 * Note that the NewHeap will not receive any of the defaults or
1143 * constraints associated with the OldHeap; we don't need 'em, and there's
1144 * no reason to spend cycles inserting them into the catalogs only to
1145 * delete them.
1146 */
1147
1148 /*
1149 * But we do want to use reloptions of the old heap for new heap.
1150 */
1152 if (!HeapTupleIsValid(tuple))
1153 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
1154 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1155 &isNull);
1156 if (isNull)
1157 reloptions = (Datum) 0;
1158
1159 if (relpersistence == RELPERSISTENCE_TEMP)
1161 else
1163
1164 /*
1165 * Create the new heap, using a temporary name in the same namespace as
1166 * the existing table. NOTE: there is some risk of collision with user
1167 * relnames. Working around this seems more trouble than it's worth; in
1168 * particular, we can't create the new heap in a different namespace from
1169 * the old, or we will have problems with the TEMP status of temp tables.
1170 *
1171 * Note: the new heap is not a shared relation, even if we are rebuilding
1172 * a shared rel. However, we do make the new heap mapped if the source is
1173 * mapped. This simplifies swap_relation_files, and is absolutely
1174 * necessary for rebuilding pg_class, for reasons explained there.
1175 */
1176 snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
1177
1181 InvalidOid,
1182 InvalidOid,
1183 InvalidOid,
1184 OldHeap->rd_rel->relowner,
1187 NIL,
1189 relpersistence,
1190 false,
1193 reloptions,
1194 false,
1195 true,
1196 true,
1197 OIDOldHeap,
1198 NULL);
1200
1201 ReleaseSysCache(tuple);
1202
1203 /*
1204 * Advance command counter so that the newly-created relation's catalog
1205 * tuples will be visible to table_open.
1206 */
1208
1209 /*
1210 * If necessary, create a TOAST table for the new relation.
1211 *
1212 * If the relation doesn't have a TOAST table already, we can't need one
1213 * for the new relation. The other way around is possible though: if some
1214 * wide columns have been dropped, NewHeapCreateToastTable can decide that
1215 * no TOAST table is needed for the new table.
1216 *
1217 * Note that NewHeapCreateToastTable ends with CommandCounterIncrement, so
1218 * that the TOAST table will be visible for insertion.
1219 */
1220 toastid = OldHeap->rd_rel->reltoastrelid;
1221 if (OidIsValid(toastid))
1222 {
1223 /* keep the existing toast table's reloptions, if any */
1225 if (!HeapTupleIsValid(tuple))
1226 elog(ERROR, "cache lookup failed for relation %u", toastid);
1227 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
1228 &isNull);
1229 if (isNull)
1230 reloptions = (Datum) 0;
1231
1232 NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
1233
1234 ReleaseSysCache(tuple);
1235 }
1236
1238
1239 return OIDNewHeap;
1240}
Oid heap_create_with_catalog(const char *relname, Oid relnamespace, Oid reltablespace, Oid relid, Oid reltypeid, Oid reloftypeid, Oid ownerid, Oid accessmtd, TupleDesc tupdesc, List *cooked_constraints, char relkind, char relpersistence, bool shared_relation, bool mapped_relation, OnCommitAction oncommit, Datum reloptions, bool use_user_acl, bool allow_system_table_mods, bool is_internal, Oid relrewrite, ObjectAddress *typaddress)
Definition heap.c:1137
Oid LookupCreationNamespace(const char *nspname)
Definition namespace.c:3500
#define NIL
Definition pg_list.h:68
@ ONCOMMIT_NOOP
Definition primnodes.h:59
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationIsMapped(relation)
Definition rel.h:565
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void NewHeapCreateToastTable(Oid relOid, Datum reloptions, LOCKMODE lockmode, Oid OIDOldToast)
Definition toasting.c:65

References Assert, CommandCounterIncrement(), elog, ERROR, fb(), heap_create_with_catalog(), HeapTupleIsValid, InvalidOid, LookupCreationNamespace(), NAMEDATALEN, NewHeapCreateToastTable(), NIL, NoLock, ObjectIdGetDatum(), OidIsValid, ONCOMMIT_NOOP, RelationGetDescr, RelationGetNamespace, RelationIsMapped, ReleaseSysCache(), SearchSysCache1(), snprintf, SysCacheGetAttr(), table_close(), and table_open().

Referenced by ATRewriteTables(), rebuild_relation(), and RefreshMatViewByOid().

◆ mark_index_clustered()

void mark_index_clustered ( Relation  rel,
Oid  indexOid,
bool  is_internal 
)
extern

Definition at line 827 of file repack.c.

828{
833
834 Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
835
836 /*
837 * If the index is already marked clustered, no need to do anything.
838 */
839 if (OidIsValid(indexOid))
840 {
841 if (get_index_isclustered(indexOid))
842 return;
843 }
844
845 /*
846 * Check each index of the relation and set/clear the bit as needed.
847 */
849
850 foreach(index, RelationGetIndexList(rel))
851 {
853
857 elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
859
860 /*
861 * Unset the bit if set. We know it's wrong because we checked this
862 * earlier.
863 */
864 if (indexForm->indisclustered)
865 {
866 indexForm->indisclustered = false;
868 }
869 else if (thisIndexOid == indexOid)
870 {
871 /* this was checked earlier, but let's be real sure */
872 if (!indexForm->indisvalid)
873 elog(ERROR, "cannot cluster on invalid index %u", indexOid);
874 indexForm->indisclustered = true;
876 }
877
879 InvalidOid, is_internal);
880
882 }
883
885}
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
bool get_index_isclustered(Oid index_oid)
Definition lsyscache.c:3879
#define InvokeObjectPostAlterHookArg(classId, objectId, subId, auxiliaryId, is_internal)
END_CATALOG_STRUCT typedef FormData_pg_index * Form_pg_index
Definition pg_index.h:74
#define lfirst_oid(lc)
Definition pg_list.h:174
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4837

References Assert, CatalogTupleUpdate(), elog, ERROR, fb(), Form_pg_index, get_index_isclustered(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, InvalidOid, InvokeObjectPostAlterHookArg, lfirst_oid, ObjectIdGetDatum(), OidIsValid, RelationData::rd_rel, RelationGetIndexList(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ATExecClusterOn(), ATExecDropCluster(), and rebuild_relation().

◆ ProcessRepackMessages()

void ProcessRepackMessages ( void  )
extern

Definition at line 3623 of file repack.c.

3624{
3625 MemoryContext oldcontext;
3627
3628 /*
3629 * Nothing to do if we haven't launched the worker yet or have already
3630 * terminated it.
3631 */
3632 if (decoding_worker == NULL)
3633 return;
3634
3635 /*
3636 * This is invoked from ProcessInterrupts(), and since some of the
3637 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
3638 * for recursive calls if more signals are received while this runs. It's
3639 * unclear that recursive entry would be safe, and it doesn't seem useful
3640 * even if it is safe, so let's block interrupts until done.
3641 */
3643
3644 /*
3645 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
3646 * don't want to risk leaking data into long-lived contexts, so let's do
3647 * our work here in a private context that we can reset on each use.
3648 */
3649 if (hpm_context == NULL) /* first time through? */
3651 "ProcessRepackMessages",
3653 else
3655
3656 oldcontext = MemoryContextSwitchTo(hpm_context);
3657
3658 /* OK to process messages. Reset the flag saying there are more to do. */
3659 RepackMessagePending = false;
3660
3661 /*
3662 * Read as many messages as we can from the worker, but stop when no more
3663 * messages can be read from the worker without blocking.
3664 */
3665 while (true)
3666 {
3667 shm_mq_result res;
3668 Size nbytes;
3669 void *data;
3670
3672 &data, true);
3673 if (res == SHM_MQ_WOULD_BLOCK)
3674 break;
3675 else if (res == SHM_MQ_SUCCESS)
3676 {
3677 StringInfoData msg;
3678
3679 initStringInfo(&msg);
3680 appendBinaryStringInfo(&msg, data, nbytes);
3682 pfree(msg.data);
3683 }
3684 else
3685 {
3686 /*
3687 * The decoding worker is special in that it exits as soon as it
3688 * has its work done. Thus the DETACHED result code is fine.
3689 */
3690 Assert(res == SHM_MQ_DETACHED);
3691
3692 break;
3693 }
3694 }
3695
3696 MemoryContextSwitchTo(oldcontext);
3697
3698 /* Might as well clear the context on our way out */
3700
3702}
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
void pfree(void *pointer)
Definition mcxt.c:1619
MemoryContext TopMemoryContext
Definition mcxt.c:167
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const void * data
static void ProcessRepackMessage(StringInfo msg)
Definition repack.c:3708
static DecodingWorker * decoding_worker
Definition repack.c:144
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
@ SHM_MQ_DETACHED
Definition shm_mq.h:42
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mqh
Definition repack.c:140

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), Assert, StringInfoData::data, data, decoding_worker, DecodingWorker::error_mqh, fb(), HOLD_INTERRUPTS, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), pfree(), ProcessRepackMessage(), RepackMessagePending, RESUME_INTERRUPTS, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)
extern

Definition at line 60 of file repack_worker.c.

61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
72 /*
73 * Override the default bgworker_die() with die() so we can use
74 * CHECK_FOR_INTERRUPTS().
75 */
78
80 if (seg == NULL)
83 errmsg("could not map dynamic shared memory segment"));
85
87
88 /* Arrange to signal the leader if we exit. */
90
91 /*
92 * Join locking group - see the comments around the call of
93 * start_repack_decoding_worker().
94 */
95 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 return; /* The leader is not running anymore. */
97
98 /*
99 * Setup a queue to send error messages to the backend that launched this
100 * worker.
101 */
102 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
104 mqh = shm_mq_attach(mq, seg, NULL);
107 shared->backend_proc_number);
108
109 /* Connect to the database. LOGIN is not required. */
112
113 /*
114 * Transaction is needed to open relation, and it also provides us with a
115 * resource owner.
116 */
118
120
121 /*
122 * Not sure the spinlock is needed here - the backend should not change
123 * anything in the shared memory until we have serialized the snapshot.
124 */
125 SpinLockAcquire(&shared->mutex);
127 sfs = &shared->sfs;
128 SpinLockRelease(&shared->mutex);
129
130 SharedFileSetAttach(sfs, seg);
131
132 /*
133 * Prepare to capture the concurrent data changes ourselves.
134 */
136
137 /* Announce that we're ready. */
138 SpinLockAcquire(&shared->mutex);
139 shared->initialized = true;
140 SpinLockRelease(&shared->mutex);
141 ConditionVariableSignal(&shared->cv);
142
143 /* There doesn't seem to a nice API to set these */
145 XactReadOnly = true;
146
147 /* Build the initial snapshot and export it. */
148 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 export_initial_snapshot(snapshot, shared);
150
151 /*
152 * Only historic snapshots should be used now. Do not let us restrict the
153 * progress of xmin horizon.
154 */
156
157 for (;;)
158 {
159 bool stop = decode_concurrent_changes(decoding_ctx, shared);
160
161 if (stop)
162 break;
163
164 }
165
166 /* Cleanup. */
169}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BUFFERALIGN(LEN)
Definition c.h:898
void ConditionVariableSignal(ConditionVariable *cv)
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
#define die(msg)
#define pqsignal
Definition port.h:548
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
#define PointerGetDatum(X)
Definition postgres.h:354
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static dsm_segment * worker_dsm_segment
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2072
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BGWORKER_BYPASS_ROLELOGINCHECK, BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), worker_dsm_segment, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

Variable Documentation

◆ RepackMessagePending

PGDLLIMPORT volatile sig_atomic_t RepackMessagePending
extern