PostgreSQL Source Code git master
Loading...
Searching...
No Matches
repack_worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * repack_worker.c
4 * Implementation of the background worker for ad-hoc logical decoding
5 * during REPACK (CONCURRENTLY).
6 *
7 *
8 * Copyright (c) 2026, PostgreSQL Global Development Group
9 *
10 *
11 * IDENTIFICATION
12 * src/backend/commands/repack_worker.c
13 *
14 *-------------------------------------------------------------------------
15 */
16#include "postgres.h"
17
18#include "access/table.h"
20#include "access/xlogutils.h"
21#include "access/xlogwait.h"
22#include "commands/repack.h"
24#include "libpq/pqmq.h"
26#include "storage/ipc.h"
27#include "storage/proc.h"
28#include "tcop/tcopprot.h"
29#include "utils/memutils.h"
30
31#define REPL_PLUGIN_NAME "pgrepack"
32
33static void RepackWorkerShutdown(int code, Datum arg);
36static void export_initial_snapshot(Snapshot snapshot,
37 DecodingWorkerShared *shared);
39 DecodingWorkerShared *shared);
40
41/* Is this process a REPACK worker? */
42static bool am_repack_worker = false;
43
44/* The WAL segment being decoded. */
46
47/* Our DSM segment, for shutting down */
49
50/*
51 * Keep track of the table we're processing, to skip logical decoding of data
52 * from other relations.
53 */
56
57
58/* REPACK decoding worker entry point */
59void
61{
62 dsm_segment *seg;
64 shm_mq *mq;
67 SharedFileSet *sfs;
68 Snapshot snapshot;
69
70 am_repack_worker = true;
71
72 /*
73 * Override the default bgworker_die() with die() so we can use
74 * CHECK_FOR_INTERRUPTS().
75 */
78
80 if (seg == NULL)
83 errmsg("could not map dynamic shared memory segment"));
85
87
88 /* Arrange to signal the leader if we exit. */
90
91 /*
92 * Join locking group - see the comments around the call of
93 * start_repack_decoding_worker().
94 */
95 if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 return; /* The leader is not running anymore. */
97
98 /*
99 * Setup a queue to send error messages to the backend that launched this
100 * worker.
101 */
102 mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
104 mqh = shm_mq_attach(mq, seg, NULL);
107 shared->backend_proc_number);
108
109 /* Connect to the database. LOGIN is not required. */
112
113 /*
114 * Transaction is needed to open relation, and it also provides us with a
115 * resource owner.
116 */
118
120
121 /*
122 * Not sure the spinlock is needed here - the backend should not change
123 * anything in the shared memory until we have serialized the snapshot.
124 */
125 SpinLockAcquire(&shared->mutex);
127 sfs = &shared->sfs;
128 SpinLockRelease(&shared->mutex);
129
130 SharedFileSetAttach(sfs, seg);
131
132 /*
133 * Prepare to capture the concurrent data changes ourselves.
134 */
136
137 /* Announce that we're ready. */
138 SpinLockAcquire(&shared->mutex);
139 shared->initialized = true;
140 SpinLockRelease(&shared->mutex);
141 ConditionVariableSignal(&shared->cv);
142
143 /* There doesn't seem to a nice API to set these */
145 XactReadOnly = true;
146
147 /* Build the initial snapshot and export it. */
148 snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 export_initial_snapshot(snapshot, shared);
150
151 /*
152 * Only historic snapshots should be used now. Do not let us restrict the
153 * progress of xmin horizon.
154 */
156
157 for (;;)
158 {
159 bool stop = decode_concurrent_changes(decoding_ctx, shared);
160
161 if (stop)
162 break;
163
164 }
165
166 /* Cleanup. */
169}
170
171/*
172 * See ParallelWorkerShutdown for details.
173 */
174static void
185
186bool
188{
189 return am_repack_worker;
190}
191
192/*
193 * This function is much like pg_create_logical_replication_slot() except that
194 * the new slot is neither released (if anyone else could read changes from
195 * our slot, we could miss changes other backends do while we copy the
196 * existing data into temporary table), nor persisted (it's easier to handle
197 * crash by restarting all the work from scratch).
198 */
201{
202 Relation rel;
205 NameData slotname;
208
209 /*
210 * REPACK CONCURRENTLY is not allowed in a transaction block, so this
211 * should never fire.
212 */
214
215 /*
216 * Make sure we can use logical decoding.
217 */
219
220 /*
221 * A single backend should not execute multiple REPACK commands at a time,
222 * so use PID to make the slot unique.
223 *
224 * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
225 */
226 snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
227 ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
228 false, false);
229
231
232 /*
233 * Neither prepare_write nor do_write callback nor update_progress is
234 * useful for us.
235 */
237 NIL,
238 true,
239 true,
241 XL_ROUTINE(.page_read = read_local_xlog_page,
242 .segment_open = wal_segment_open,
243 .segment_close = wal_segment_close),
244 NULL, NULL, NULL);
245
246 /*
247 * We don't have control on setting fast_forward, so at least check it.
248 */
249 Assert(!ctx->fast_forward);
250
251 /* Avoid logical decoding of other relations. */
252 rel = table_open(relid, AccessShareLock);
254 toastrelid = rel->rd_rel->reltoastrelid;
256 {
258
259 /* Avoid logical decoding of other TOAST relations. */
263 }
265
267
268 /*
269 * decode_concurrent_changes() needs non-blocking callback.
270 */
272
273 /* Some WAL records should have been read. */
275
276 /*
277 * Initialize repack_current_segment so that we can notice WAL segment
278 * boundaries.
279 */
282
283 /* Our private state belongs to the decoding context. */
285
286 /*
287 * read_local_xlog_page_no_wait() needs to be able to indicate the end of
288 * WAL.
289 */
293
294#ifdef USE_ASSERT_CHECKING
295 dstate->relid = relid;
296#endif
297
298 dstate->change_cxt = AllocSetContextCreate(ctx->context,
299 "REPACK - change",
301
302 /* The file will be set as soon as we have it opened. */
303 dstate->file = NULL;
304
305 /*
306 * Memory context and resource owner for long-lived resources.
307 */
308 dstate->worker_cxt = CurrentMemoryContext;
309 dstate->worker_resowner = CurrentResourceOwner;
310
312
313 return ctx;
314}
315
316static void
328
329/*
330 * Make snapshot available to the backend that launched the decoding worker.
331 */
332static void
334{
335 char fname[MAXPGPATH];
336 BufFile *file;
338 char *snap_space;
339
341 snap_space = (char *) palloc(snap_size);
342 SerializeSnapshot(snapshot, snap_space);
343
344 DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
345 file = BufFileCreateFileSet(&shared->sfs.fs, fname);
346 /* To make restoration easier, write the snapshot size first. */
347 BufFileWrite(file, &snap_size, sizeof(snap_size));
349 BufFileClose(file);
351
352 /* Increase the counter to tell the backend that the file is available. */
353 SpinLockAcquire(&shared->mutex);
354 shared->last_exported++;
355 SpinLockRelease(&shared->mutex);
356 ConditionVariableSignal(&shared->cv);
357}
358
359/*
360 * Decode logical changes from the WAL sequence and store them to a file.
361 *
362 * If true is returned, there is no more work for the worker.
363 */
364static bool
366 DecodingWorkerShared *shared)
367{
369 XLogRecPtr lsn_upto;
370 bool done;
371 char fname[MAXPGPATH];
372
374
375 /* Open the output file. */
376 DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
377 dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
378
379 SpinLockAcquire(&shared->mutex);
380 lsn_upto = shared->lsn_upto;
381 done = shared->done;
382 SpinLockRelease(&shared->mutex);
383
384 while (true)
385 {
386 XLogRecord *record;
388 char *errm = NULL;
389 XLogRecPtr end_lsn;
390
392
393 record = XLogReadRecord(ctx->reader, &errm);
394 if (record)
395 {
397
398 /*
399 * If WAL segment boundary has been crossed, inform the decoding
400 * system that the catalog_xmin can advance.
401 */
402 end_lsn = ctx->reader->EndRecPtr;
405 {
407 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
408 (uint32) (end_lsn >> 32), (uint32) end_lsn);
410 }
411 }
412 else
413 {
415
416 if (errm)
418 errmsg("%s", errm));
419
420 /*
421 * In the decoding loop we do not want to get blocked when there
422 * is no more WAL available, otherwise the loop would become
423 * uninterruptible.
424 */
426 if (priv->end_of_wal)
427 /* Do not miss the end of WAL condition next time. */
428 priv->end_of_wal = false;
429 else
431 errmsg("could not read WAL record"));
432 }
433
434 /*
435 * Whether we could read new record or not, keep checking if
436 * 'lsn_upto' was specified.
437 */
438 if (!XLogRecPtrIsValid(lsn_upto))
439 {
440 SpinLockAcquire(&shared->mutex);
441 lsn_upto = shared->lsn_upto;
442 /* 'done' should be set at the same time as 'lsn_upto' */
443 done = shared->done;
444 SpinLockRelease(&shared->mutex);
445 }
446 if (XLogRecPtrIsValid(lsn_upto) &&
447 ctx->reader->EndRecPtr >= lsn_upto)
448 break;
449
450 if (record == NULL)
451 {
452 int64 timeout = 0;
453 WaitLSNResult res;
454
455 /*
456 * Before we retry reading, wait until new WAL is flushed.
457 *
458 * There is a race condition such that the backend executing
459 * REPACK determines 'lsn_upto', but before it sets the shared
460 * variable, we reach the end of WAL. In that case we'd need to
461 * wait until the next WAL flush (unrelated to REPACK). Although
462 * that should not be a problem in a busy system, it might be
463 * noticeable in other cases, including regression tests (which
464 * are not necessarily executed in parallel). Therefore it makes
465 * sense to use timeout.
466 *
467 * If lsn_upto is valid, WAL records having LSN lower than that
468 * should already have been flushed to disk.
469 */
470 if (!XLogRecPtrIsValid(lsn_upto))
471 timeout = 100L;
473 ctx->reader->EndRecPtr + 1,
474 timeout);
475 if (res != WAIT_LSN_RESULT_SUCCESS &&
478 errmsg("waiting for WAL failed"));
479 }
480 }
481
482 /*
483 * Close the file so we can make it available to the backend.
484 */
485 BufFileClose(dstate->file);
486 dstate->file = NULL;
487 SpinLockAcquire(&shared->mutex);
488 shared->lsn_upto = InvalidXLogRecPtr;
489 shared->last_exported++;
490 SpinLockRelease(&shared->mutex);
491 ConditionVariableSignal(&shared->cv);
492
493 return done;
494}
495
496/*
497 * Does the WAL record contain a data change that this backend does not need
498 * to decode on behalf of REPACK (CONCURRENTLY)?
499 */
500bool
502{
504 RelFileLocator locator;
505
506 /* TOAST locator should not be set unless the main is. */
509
510 /*
511 * Backends not involved in REPACK (CONCURRENTLY) should not do the
512 * filtering.
513 */
515 return false;
516
517 /*
518 * If the record does not contain the block 0, it's probably not INSERT /
519 * UPDATE / DELETE. In any case, we do not have enough information to
520 * filter the change out.
521 */
522 if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
523 return false;
524
525 /*
526 * Decode the change if it belongs to the table we are repacking, or if it
527 * belongs to its TOAST relation.
528 */
530 return false;
533 return false;
534
535 /* Filter out changes of other tables. */
536 return true;
537}
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268
void BufFileClose(BufFile *file)
Definition buffile.c:413
#define NameStr(name)
Definition c.h:835
#define BUFFERALIGN(LEN)
Definition c.h:898
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
uint32_t uint32
Definition c.h:624
#define OidIsValid(objectId)
Definition c.h:858
size_t Size
Definition c.h:689
void ConditionVariableSignal(ConditionVariable *cv)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:89
void dsm_detach(dsm_segment *seg)
Definition dsm.c:811
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1103
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:673
Datum arg
Definition elog.c:1323
int errcode(int sqlerrcode)
Definition elog.c:875
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
#define palloc0_object(type)
Definition fe_memutils.h:75
int MyProcPid
Definition globals.c:49
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
#define AccessShareLock
Definition lockdefs.h:36
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1816
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:673
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:629
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:325
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define NAMEDATALEN
#define MAXPGPATH
#define NIL
Definition pg_list.h:68
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
#define pqsignal
Definition port.h:547
#define snprintf
Definition port.h:260
static uint32 DatumGetUInt32(Datum X)
Definition postgres.h:222
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define PointerGetDatum(X)
Definition postgres.h:354
#define InvalidOid
unsigned int Oid
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:85
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:56
static int fb(int x)
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_REPACK_MESSAGE
Definition procsignal.h:40
#define RelFileLocatorEquals(locator1, locator2)
void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
Definition repack.c:3490
static void RepackWorkerShutdown(int code, Datum arg)
static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
static RelFileLocator repacked_rel_toast_locator
bool AmRepackWorker(void)
static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared)
bool change_useless_for_repack(XLogRecordBuffer *buf)
static XLogSegNo repack_current_segment
static dsm_segment * worker_dsm_segment
#define REPL_PLUGIN_NAME
static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
static LogicalDecodingContext * repack_setup_logical_decoding(Oid relid)
static RelFileLocator repacked_rel_locator
void RepackWorkerMain(Datum main_arg)
static bool am_repack_worker
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
void ReplicationSlotDropAcquired(void)
Definition slot.c:1042
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Definition snapmgr.c:1736
Size EstimateSnapshotSpace(Snapshot snapshot)
Definition snapmgr.c:1712
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
bool BecomeLockGroupMember(PGPROC *leader, int pid)
Definition proc.c:2072
ConditionVariable cv
char error_queue[FLEXIBLE_ARRAY_MEMBER]
XLogReaderState * reader
Definition logical.h:42
MemoryContext context
Definition logical.h:36
void * output_writer_private
Definition logical.h:81
RelFileNumber relNumber
RelFileLocator rd_locator
Definition rel.h:57
Form_pg_class rd_rel
Definition rel.h:111
XLogPageReadCB page_read
Definition xlogreader.h:94
DecodedXLogRecord * record
Definition xlogreader.h:235
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogReaderRoutine routine
Definition xlogreader.h:179
void * private_data
Definition xlogreader.h:195
Definition c.h:830
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
#define TransactionIdIsValid(xid)
Definition transam.h:41
bool XactReadOnly
Definition xact.c:84
TransactionId GetTopTransactionIdIfAny(void)
Definition xact.c:443
void StartTransactionCommand(void)
Definition xact.c:3109
int XactIsoLevel
Definition xact.c:81
void CommitTransactionCommand(void)
Definition xact.c:3207
#define XACT_REPEATABLE_READ
Definition xact.h:38
int wal_segment_size
Definition xlog.c:150
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition xlogutils.c:806
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition xlogutils.c:845
int read_local_xlog_page_no_wait(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition xlogutils.c:857
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition xlogwait.c:378
WaitLSNResult
Definition xlogwait.h:26
@ WAIT_LSN_RESULT_TIMEOUT
Definition xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition xlogwait.h:27
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition xlogwait.h:44