PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack_worker.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "access/xlogwait.h"
#include "commands/repack.h"
#include "commands/repack_internal.h"
#include "libpq/pqmq.h"
#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
Include dependency graph for repack_worker.c:

Go to the source code of this file.

Macros

#define REPL_PLUGIN_NAME   "pgrepack"
 

Functions

static void RepackWorkerShutdown (int code, Datum arg)
 
static LogicalDecodingContextrepack_setup_logical_decoding (Oid relid)
 
static void repack_cleanup_logical_decoding (LogicalDecodingContext *ctx)
 
static void export_initial_snapshot (Snapshot snapshot, DecodingWorkerShared *shared)
 
static bool decode_concurrent_changes (LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
 
void RepackWorkerMain (Datum main_arg)
 
bool AmRepackWorker (void)
 
bool change_useless_for_repack (XLogRecordBuffer *buf)
 

Variables

static bool am_repack_worker = false
 
static XLogSegNo repack_current_segment = 0
 
static dsm_segmentworker_dsm_segment = NULL
 
static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}
 
static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}
 

Macro Definition Documentation

◆ REPL_PLUGIN_NAME

#define REPL_PLUGIN_NAME   "pgrepack"

Definition at line 31 of file repack_worker.c.

Function Documentation

◆ AmRepackWorker()

bool AmRepackWorker ( void  )

Definition at line 182 of file repack_worker.c.

183{
184 return am_repack_worker;
185}
static bool am_repack_worker

References am_repack_worker.

Referenced by mq_putmessage().

◆ change_useless_for_repack()

bool change_useless_for_repack ( XLogRecordBuffer buf)

Definition at line 518 of file repack_worker.c.

519{
521 RelFileLocator locator;
522
523 /* TOAST locator should not be set unless the main is. */
526
527 /*
528 * Backends not involved in REPACK (CONCURRENTLY) should not do the
529 * filtering.
530 */
532 return false;
533
534 /*
535 * If the record does not contain the block 0, it's probably not INSERT /
536 * UPDATE / DELETE. In any case, we do not have enough information to
537 * filter the change out.
538 */
539 if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
540 return false;
541
542 /*
543 * Decode the change if it belongs to the table we are repacking, or if it
544 * belongs to its TOAST relation.
545 */
547 return false;
550 return false;
551
552 /* Filter out changes of other tables. */
553 return true;
554}
#define Assert(condition)
Definition c.h:943
#define OidIsValid(objectId)
Definition c.h:858
static char buf[DEFAULT_XLOG_SEG_SIZE]
static int fb(int x)
#define RelFileLocatorEquals(locator1, locator2)
static RelFileLocator repacked_rel_toast_locator
static RelFileLocator repacked_rel_locator
RelFileNumber relNumber
DecodedXLogRecord * record
Definition xlogreader.h:235
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)

References Assert, buf, fb(), OidIsValid, XLogReaderState::record, RelFileLocatorEquals, RelFileLocator::relNumber, repacked_rel_locator, repacked_rel_toast_locator, and XLogRecGetBlockTagExtended().

Referenced by heap2_decode(), and heap_decode().

◆ decode_concurrent_changes()

static bool decode_concurrent_changes ( LogicalDecodingContext ctx,
DecodingWorkerShared shared 
)
static

Definition at line 360 of file repack_worker.c.

362{
364 XLogRecPtr lsn_upto;
365 bool done;
366 char fname[MAXPGPATH];
367
369
370 /* Open the output file. */
371 DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
372 dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
373
374 SpinLockAcquire(&shared->mutex);
375 lsn_upto = shared->lsn_upto;
376 done = shared->done;
377 SpinLockRelease(&shared->mutex);
378
379 while (true)
380 {
381 XLogRecord *record;
383 char *errm = NULL;
384 XLogRecPtr end_lsn;
385
387
388 record = XLogReadRecord(ctx->reader, &errm);
389 if (record)
390 {
392
393 /*
394 * We want to allow WAL to be recycled while REPACK is running.
395 *
396 * In normal usage of a replication slot, we need to be very
397 * careful not to advance the LSN until it's been confirmed as
398 * received by the remote. In REPACK's case, this is not needed:
399 * REPACK will never try to replay the same WAL after a crash, and
400 * if there _is_ a crash, the whole REPACK has to be started from
401 * scratch anyway.
402 *
403 * So here we disregard the careful LSN tracking and just move the
404 * LSN locations forward to what we've processed. Note that it
405 * would be bogus to move the xmin forward, though, so we don't
406 * touch that.
407 *
408 * This can be done on whatever schedule is convenient, but in
409 * order not to cause unnecessary load, we only do it as we cross
410 * each WAL segment boundary.
411 */
412 end_lsn = ctx->reader->EndRecPtr;
415 {
418 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
419 (uint32) (end_lsn >> 32), (uint32) end_lsn);
421 }
422 }
423 else
424 {
426
427 if (errm)
430 errmsg("could not read WAL from timeline %u at %X/%08X: %s",
431 ctx->reader->currTLI,
433 errm));
434
435 /*
436 * In the decoding loop we do not want to get blocked when there
437 * is no more WAL available, otherwise the loop would become
438 * uninterruptible.
439 */
441 if (priv->end_of_wal)
442 /* Do not miss the end of WAL condition next time. */
443 priv->end_of_wal = false;
444 else
447 errmsg("could not read WAL record"));
448 }
449
450 /*
451 * Whether we could read new record or not, keep checking if
452 * 'lsn_upto' was specified.
453 */
454 if (!XLogRecPtrIsValid(lsn_upto))
455 {
456 SpinLockAcquire(&shared->mutex);
457 lsn_upto = shared->lsn_upto;
458 /* 'done' should be set at the same time as 'lsn_upto' */
459 done = shared->done;
460 SpinLockRelease(&shared->mutex);
461 }
462 if (XLogRecPtrIsValid(lsn_upto) &&
463 ctx->reader->EndRecPtr >= lsn_upto)
464 break;
465
466 if (record == NULL)
467 {
468 int64 timeout = 0;
469 WaitLSNResult res;
470
471 /*
472 * Before we retry reading, wait until new WAL is flushed.
473 *
474 * There is a race condition such that the backend executing
475 * REPACK determines 'lsn_upto', but before it sets the shared
476 * variable, we reach the end of WAL. In that case we'd need to
477 * wait until the next WAL flush (unrelated to REPACK). Although
478 * that should not be a problem in a busy system, it might be
479 * noticeable in other cases, including regression tests (which
480 * are not necessarily executed in parallel). Therefore it makes
481 * sense to use timeout.
482 *
483 * If lsn_upto is valid, WAL records having LSN lower than that
484 * should already have been flushed to disk.
485 */
486 if (!XLogRecPtrIsValid(lsn_upto))
487 timeout = 100L;
489 ctx->reader->EndRecPtr + 1,
490 timeout);
491 if (res != WAIT_LSN_RESULT_SUCCESS &&
495 errmsg("waiting for WAL failed"));
496 }
497 }
498
499 /*
500 * Close the file so we can make it available to the backend.
501 */
502 BufFileClose(dstate->file);
503 dstate->file = NULL;
504 SpinLockAcquire(&shared->mutex);
505 shared->lsn_upto = InvalidXLogRecPtr;
506 shared->last_exported++;
507 SpinLockRelease(&shared->mutex);
508 ConditionVariableSignal(&shared->cv);
509
510 return done;
511}
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268
void BufFileClose(BufFile *file)
Definition buffile.c:413
int64_t int64
Definition c.h:621
uint32_t uint32
Definition c.h:624
void ConditionVariableSignal(ConditionVariable *cv)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:89
int errcode_for_file_access(void)
Definition elog.c:898
int errcode(int sqlerrcode)
Definition elog.c:875
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1813
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
Definition logical.c:1737
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
Definition repack.c:3641
static XLogSegNo repack_current_segment
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ConditionVariable cv
XLogReaderState * reader
Definition logical.h:42
void * output_writer_private
Definition logical.h:81
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
TimeLineID currTLI
Definition xlogreader.h:284
void * private_data
Definition xlogreader.h:195
int wal_segment_size
Definition xlog.c:150
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:403
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44

References BufFileClose(), BufFileCreateFileSet(), CHECK_FOR_INTERRUPTS, ConditionVariableSignal(), XLogReaderState::currTLI, DecodingWorkerShared::cv, DEBUG1, DecodingWorkerFileName(), DecodingWorkerShared::done, elog, ReadLocalXLogPageNoWaitPrivate::end_of_wal, XLogReaderState::EndRecPtr, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), SharedFileSet::fs, InvalidXLogRecPtr, DecodingWorkerShared::last_exported, LogicalConfirmReceivedLocation(), LogicalDecodingProcessRecord(), LogicalIncreaseRestartDecodingForSlot(), LSN_FORMAT_ARGS, DecodingWorkerShared::lsn_upto, MAXPGPATH, DecodingWorkerShared::mutex, LogicalDecodingContext::output_writer_private, XLogReaderState::private_data, LogicalDecodingContext::reader, DecodingWorkerShared::relid, repack_current_segment, DecodingWorkerShared::sfs, SpinLockAcquire(), SpinLockRelease(), WAIT_LSN_RESULT_SUCCESS, WAIT_LSN_RESULT_TIMEOUT, WAIT_LSN_TYPE_PRIMARY_FLUSH, WaitForLSN(), wal_segment_size, XLByteToSeg, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by RepackWorkerMain().

◆ export_initial_snapshot()

static void export_initial_snapshot ( Snapshot  snapshot,
DecodingWorkerShared shared 
)
static

Definition at line 328 of file repack_worker.c.

329{
330 char fname[MAXPGPATH];
331 BufFile *file;
333 char *snap_space;
334
336 snap_space = (char *) palloc(snap_size);
337 SerializeSnapshot(snapshot, snap_space);
338
339 DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
340 file = BufFileCreateFileSet(&shared->sfs.fs, fname);
341 /* To make restoration easier, write the snapshot size first. */
342 BufFileWrite(file, &snap_size, sizeof(snap_size));
344 BufFileClose(file);
346
347 /* Increase the counter to tell the backend that the file is available. */
348 SpinLockAcquire(&shared->mutex);
349 shared->last_exported++;
350 SpinLockRelease(&shared->mutex);
351 ConditionVariableSignal(&shared->cv);
352}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
size_t Size
Definition c.h:689
void pfree(void *pointer)
Definition mcxt.c:1619
void * palloc(Size size)
Definition mcxt.c:1390
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Definition snapmgr.c:1736
Size EstimateSnapshotSpace(Snapshot snapshot)
Definition snapmgr.c:1712

References BufFileClose(), BufFileCreateFileSet(), BufFileWrite(), ConditionVariableSignal(), DecodingWorkerShared::cv, DecodingWorkerFileName(), EstimateSnapshotSpace(), fb(), SharedFileSet::fs, DecodingWorkerShared::last_exported, MAXPGPATH, DecodingWorkerShared::mutex, palloc(), pfree(), DecodingWorkerShared::relid, SerializeSnapshot(), DecodingWorkerShared::sfs, SpinLockAcquire(), and SpinLockRelease().

Referenced by RepackWorkerMain().

◆ repack_cleanup_logical_decoding()

static void repack_cleanup_logical_decoding ( LogicalDecodingContext ctx)
static

Definition at line 312 of file repack_worker.c.

313{
315
317 if (dstate->slot)
319
322}
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:670
void ReplicationSlotDropAcquired(bool try_disable)
Definition slot.c:1031

References ExecDropSingleTupleTableSlot(), fb(), FreeDecodingContext(), LogicalDecodingContext::output_writer_private, and ReplicationSlotDropAcquired().

Referenced by RepackWorkerMain().

◆ repack_setup_logical_decoding()

static LogicalDecodingContext * repack_setup_logical_decoding ( Oid  relid)
static

Definition at line 195 of file repack_worker.c.

196{
197 Relation rel;
200 NameData slotname;
203
204 /*
205 * REPACK CONCURRENTLY is not allowed in a transaction block, so this
206 * should never fire.
207 */
209
210 /*
211 * Make sure we can use logical decoding.
212 */
214
215 /*
216 * A single backend should not execute multiple REPACK commands at a time,
217 * so use PID to make the slot unique.
218 *
219 * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
220 */
221 snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
222 ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
223 false, false);
224
226
227 /*
228 * Neither prepare_write nor do_write callback nor update_progress is
229 * useful for us.
230 */
232 NIL,
233 true,
234 true,
236 XL_ROUTINE(.page_read = read_local_xlog_page,
237 .segment_open = wal_segment_open,
238 .segment_close = wal_segment_close),
239 NULL, NULL, NULL);
240
241 /*
242 * We don't have control on setting fast_forward, so at least check it.
243 */
244 Assert(!ctx->fast_forward);
245
246 /* Avoid logical decoding of other relations. */
247 rel = table_open(relid, AccessShareLock);
249 toastrelid = rel->rd_rel->reltoastrelid;
251 {
253
254 /* Avoid logical decoding of other TOAST relations. */
258 }
260
262
263 /*
264 * decode_concurrent_changes() needs non-blocking callback.
265 */
267
268 /* Some WAL records should have been read. */
270
271 /*
272 * Initialize repack_current_segment so that we can notice WAL segment
273 * boundaries.
274 */
277
278 /* Our private state belongs to the decoding context. */
280
281 /*
282 * read_local_xlog_page_no_wait() needs to be able to indicate the end of
283 * WAL.
284 */
288
289#ifdef USE_ASSERT_CHECKING
290 dstate->relid = relid;
291#endif
292
293 dstate->change_cxt = AllocSetContextCreate(ctx->context,
294 "REPACK - change",
296
297 /* The file will be set as soon as we have it opened. */
298 dstate->file = NULL;
299
300 /*
301 * Memory context and resource owner for long-lived resources.
302 */
303 dstate->worker_cxt = CurrentMemoryContext;
304 dstate->worker_resowner = CurrentResourceOwner;
305
307
308 return ctx;
309}
#define NameStr(name)
Definition c.h:835
#define palloc0_object(type)
Definition fe_memutils.h:90
int MyProcPid
Definition globals.c:49
#define AccessShareLock
Definition lockdefs.h:36
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:626
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:322
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
#define NAMEDATALEN
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:261
unsigned int Oid
#define REPL_PLUGIN_NAME
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
@ RS_TEMPORARY
Definition slot.h:47
MemoryContext context
Definition logical.h:36
RelFileLocator rd_locator
Definition rel.h:57
Form_pg_class rd_rel
Definition rel.h:111
XLogPageReadCB page_read
Definition xlogreader.h:94
XLogReaderRoutine routine
Definition xlogreader.h:179
Definition c.h:830
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId GetTopTransactionIdIfAny(void)
Definition xact.c:443
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition xlogutils.c:806
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition xlogutils.c:845
int read_local_xlog_page_no_wait(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition xlogutils.c:857

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CheckLogicalDecodingRequirements(), LogicalDecodingContext::context, CreateInitDecodingContext(), CurrentMemoryContext, CurrentResourceOwner, DecodingContextFindStartpoint(), XLogReaderState::EndRecPtr, EnsureLogicalDecodingEnabled(), LogicalDecodingContext::fast_forward, fb(), GetTopTransactionIdIfAny(), InvalidXLogRecPtr, MemoryContextSwitchTo(), MyProcPid, NAMEDATALEN, NameStr, NIL, OidIsValid, LogicalDecodingContext::output_writer_private, XLogReaderRoutine::page_read, palloc0_object, XLogReaderState::private_data, RelationData::rd_locator, RelationData::rd_rel, read_local_xlog_page(), read_local_xlog_page_no_wait(), LogicalDecodingContext::reader, repack_current_segment, repacked_rel_locator, repacked_rel_toast_locator, REPL_PLUGIN_NAME, ReplicationSlotCreate(), XLogReaderState::routine, RS_TEMPORARY, snprintf, table_close(), table_open(), TransactionIdIsValid, wal_segment_close(), wal_segment_open(), wal_segment_size, XL_ROUTINE, XLByteToSeg, and XLogRecPtrIsValid.

Referenced by RepackWorkerMain().

◆ RepackWorkerMain()

void RepackWorkerMain ( Datum  main_arg)

Definition at line 60 of file repack_worker.c.

61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
73
75 if (seg == NULL)
78 errmsg("could not map dynamic shared memory segment"));
80
82
83 /* Arrange to signal the leader if we exit. */
85
86 /*
87 * Join locking group - see the comments around the call of
88 * start_repack_decoding_worker().
89 */
90 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
91 return; /* The leader is not running anymore. */
92
93 /*
94 * Setup a queue to send error messages to the backend that launched this
95 * worker.
96 */
97 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
99 mqh = shm_mq_attach(mq, seg, NULL);
102 shared->backend_proc_number);
103
104 /* Connect to the database. LOGIN is not required. */
107
108 /*
109 * Transaction is needed to open relation, and it also provides us with a
110 * resource owner.
111 */
113
115
116 /*
117 * Not sure the spinlock is needed here - the backend should not change
118 * anything in the shared memory until we have serialized the snapshot.
119 */
120 SpinLockAcquire(&shared->mutex);
122 sfs = &shared->sfs;
123 SpinLockRelease(&shared->mutex);
124
125 SharedFileSetAttach(sfs, seg);
126
127 /*
128 * Prepare to capture the concurrent data changes ourselves.
129 */
131
132 /* Announce that we're ready. */
133 SpinLockAcquire(&shared->mutex);
134 shared->initialized = true;
135 SpinLockRelease(&shared->mutex);
136 ConditionVariableSignal(&shared->cv);
137
138 /* There doesn't seem to a nice API to set these */
140 XactReadOnly = true;
141
142 /* Build the initial snapshot and export it. */
143 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
144 export_initial_snapshot(snapshot, shared);
145
146 /*
147 * Only historic snapshots should be used now. Do not let us restrict the
148 * progress of xmin horizon.
149 */
151
152 for (;;)
153 {
154 bool stop = decode_concurrent_changes(decoding_ctx, shared);
155
156 if (stop)
157 break;
158
159 }
160
161 /* Cleanup. */
164}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BUFFERALIGN(LEN)
Definition c.h:898
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
#define PointerGetDatum(X)
Definition postgres.h:354
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
static dsm_segment * worker_dsm_segment
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:444
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2105
char error_queue[FLEXIBLE_ARRAY_MEMBER]
bool XactReadOnly
Definition xact.c:84
void StartTransactionCommand(void)
Definition xact.c:3109
int XactIsoLevel
Definition xact.c:81
void CommitTransactionCommand(void)
Definition xact.c:3207
#define XACT_REPEATABLE_READ
Definition xact.h:38

References am_repack_worker, Assert, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc, DecodingWorkerShared::backend_proc_number, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BGWORKER_BYPASS_ROLELOGINCHECK, BUFFERALIGN, CommitTransactionCommand(), ConditionVariableSignal(), DecodingWorkerShared::cv, DatumGetUInt32(), DecodingWorkerShared::dbid, decode_concurrent_changes(), dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, DecodingWorkerShared::error_queue, export_initial_snapshot(), fb(), DecodingWorkerShared::initialized, InvalidateCatalogSnapshot(), DecodingWorkerShared::lsn_upto, DecodingWorkerShared::mutex, MyProc, PointerGetDatum, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), DecodingWorkerShared::relid, repack_cleanup_logical_decoding(), repack_setup_logical_decoding(), RepackWorkerShutdown(), DecodingWorkerShared::roleid, DecodingWorkerShared::sfs, SharedFileSetAttach(), shm_mq_attach(), shm_mq_set_sender(), SnapBuildInitialSnapshot(), SpinLockAcquire(), SpinLockRelease(), StartTransactionCommand(), worker_dsm_segment, XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XLogRecPtrIsValid.

◆ RepackWorkerShutdown()

static void RepackWorkerShutdown ( int  code,
Datum  arg 
)
static

Definition at line 170 of file repack_worker.c.

171{
173
176 shared->backend_proc_number);
177
179}
void dsm_detach(dsm_segment *seg)
Definition dsm.c:811
Datum arg
Definition elog.c:1323
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:296
@ PROCSIG_REPACK_MESSAGE
Definition procsignal.h:40

References arg, DecodingWorkerShared::backend_pid, DecodingWorkerShared::backend_proc_number, DatumGetPointer(), dsm_detach(), PROCSIG_REPACK_MESSAGE, SendProcSignal(), and worker_dsm_segment.

Referenced by RepackWorkerMain().

Variable Documentation

◆ am_repack_worker

bool am_repack_worker = false
static

Definition at line 42 of file repack_worker.c.

Referenced by AmRepackWorker(), and RepackWorkerMain().

◆ repack_current_segment

XLogSegNo repack_current_segment = 0
static

Definition at line 45 of file repack_worker.c.

Referenced by decode_concurrent_changes(), and repack_setup_logical_decoding().

◆ repacked_rel_locator

RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}
static

Definition at line 54 of file repack_worker.c.

54{.relNumber = InvalidOid};
#define InvalidOid

Referenced by change_useless_for_repack(), and repack_setup_logical_decoding().

◆ repacked_rel_toast_locator

RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}
static

Definition at line 55 of file repack_worker.c.

55{.relNumber = InvalidOid};

Referenced by change_useless_for_repack(), and repack_setup_logical_decoding().

◆ worker_dsm_segment

dsm_segment* worker_dsm_segment = NULL
static

Definition at line 48 of file repack_worker.c.

Referenced by RepackWorkerMain(), and RepackWorkerShutdown().