PostgreSQL Source Code git master
logicalctl.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * logicalctl.c
3 * Functionality to control logical decoding status online.
4 *
5 * This module enables dynamic control of logical decoding availability.
6 * Logical decoding becomes active under two conditions: when the wal_level
7 * parameter is set to 'logical', or when at least one valid logical replication
8 * slot exists with wal_level set to 'replica'. The system disables logical
9 * decoding when neither condition is met. Therefore, the dynamic control
10 * of logical decoding availability is required only when wal_level is set
11 * to 'replica'. Logical decoding is always enabled when wal_level='logical'
12 * and always disabled when wal_level='minimal'.
13 *
14 * The core concept of dynamically enabling and disabling logical decoding
15 * is to separately control two aspects: writing information required for
16 * logical decoding to WAL records, and using logical decoding itself. During
17 * activation, we first enable logical WAL writing while keeping logical
18 * decoding disabled. This change is reflected in the read-only
19 * effective_wal_level GUC parameter. Once we ensure that all processes have
20 * updated to the latest effective_wal_level value, we then enable logical
21 * decoding. Deactivation follows a similar careful, multi-step process
22 * in reverse order.
23 *
24 * While activation occurs synchronously right after creating the first
25 * logical slot, deactivation happens asynchronously through the checkpointer
26 * process. This design avoids a race condition at the end of recovery; see
27 * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
28 * Asynchronous deactivation also avoids excessive toggling of the logical
29 * decoding status in workloads that repeatedly create and drop a single
30 * logical slot. On the other hand, this lazy approach can delay changes
31 * to effective_wal_level and the disabling logical decoding, especially
32 * when the checkpointer is busy with other tasks. We chose this lazy approach
33 * in all deactivation paths to keep the implementation simple, even though
34 * laziness is strictly required only for end-of-recovery cases. Future work
35 * might address this limitation either by using a dedicated worker instead
36 * of the checkpointer, or by implementing synchronous waiting during slot
37 * drops if workloads are significantly affected by the lazy deactivation
38 * of logical decoding.
39 *
40 * Standby servers use the primary server's effective_wal_level and logical
41 * decoding status. Unlike normal activation and deactivation, these
42 * are updated simultaneously without status change coordination, solely by
43 * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
44 * setting has no effect during this time. Upon promotion, we update the
45 * logical decoding status based on local conditions: the wal_level value and
46 * the presence of logical slots.
47 *
48 * In the future, we could extend support to include automatic transitions
49 * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
50 * this enhancement would require additional coordination mechanisms and
51 * careful implementation of operations such as terminating walsenders and
52 * archiver processes while carefully considering the sequence of operations
53 * to ensure system stability during these transitions.
54 *
55 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
56 * Portions Copyright (c) 1994, Regents of the University of California
57 *
58 * IDENTIFICATION
59 * src/backend/replication/logical/logicalctl.c
60 *
61 *-------------------------------------------------------------------------
62 */
63
64#include "postgres.h"
65
66#include "access/xloginsert.h"
67#include "catalog/pg_control.h"
68#include "miscadmin.h"
69#include "replication/slot.h"
70#include "storage/ipc.h"
71#include "storage/lmgr.h"
72#include "storage/proc.h"
73#include "storage/procarray.h"
75
76/*
77 * Struct for controlling the logical decoding status.
78 *
79 * This struct is protected by LogicalDecodingControlLock.
80 */
82{
83 /*
84 * This is the authoritative value used by all processes to determine
85 * whether to write additional information required by logical decoding to
86 * WAL. Since this information could be checked frequently, each process
87 * caches this value in XLogLogicalInfo for better performance.
88 */
90
91 /* True if logical decoding is available in the system */
93
94 /* True if logical decoding might need to be disabled */
97
99
100/*
101 * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
102 * initialized at process startup, and updated when processing the process
103 * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
104 * is in an XID-assigned transaction, the cache update is delayed until the
105 * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
106 */
107bool XLogLogicalInfo = false;
108
109/*
110 * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
111 * an XID is assigned to the current transaction, the process sets this flag and
112 * delays the XLogLogicalInfo update until the transaction ends. This ensures
113 * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
114 * remains consistent throughout the transaction.
115 */
117
118static void update_xlog_logical_info(void);
119static void abort_logical_decoding_activation(int code, Datum arg);
120static void write_logical_decoding_status_update_record(bool status);
121
122Size
124{
125 return sizeof(LogicalDecodingCtlData);
126}
127
128void
130{
131 bool found;
132
133 LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
135 &found);
136
137 if (!found)
139}
140
141/*
142 * Initialize the logical decoding status in shmem at server startup. This
143 * must be called ONCE during postmaster or standalone-backend startup.
144 */
145void
147{
148 /* Logical decoding is always disabled when 'minimal' WAL level */
150 return;
151
152 /*
153 * Set the initial logical decoding status based on the last status. If
154 * logical decoding was enabled before the last shutdown, it remains
155 * enabled as we might have set wal_level='logical' or have at least one
156 * logical slot.
157 */
160}
161
162/*
163 * Update the XLogLogicalInfo cache.
164 */
165static inline void
167{
169}
170
171/*
172 * Initialize XLogLogicalInfo backend-private cache. This routine is called
173 * during process initialization.
174 */
175void
177{
179}
180
181/*
182 * This routine is called when we are told to update XLogLogicalInfo
183 * by a ProcSignalBarrier.
184 */
185bool
187{
189 {
190 /* Delay updating XLogLogicalInfo until the transaction end */
192 }
193 else
195
196 return true;
197}
198
199/*
200 * Check the shared memory state and return true if logical decoding is
201 * enabled on the system.
202 */
203bool
205{
206 bool enabled;
207
208 LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
210 LWLockRelease(LogicalDecodingControlLock);
211
212 return enabled;
213}
214
215/*
216 * Returns true if logical WAL logging is enabled based on the shared memory
217 * status.
218 */
219bool
221{
222 bool xlog_logical_info;
223
224 LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
225 xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
226 LWLockRelease(LogicalDecodingControlLock);
227
228 return xlog_logical_info;
229}
230
231/*
232 * Reset the local cache at end of the transaction.
233 */
234void
236{
237 /* Update the local cache if there is a pending update */
239 {
242 }
243}
244
245/*
246 * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
247 * status.
248 */
249static void
251{
252 XLogRecPtr recptr;
253
255 XLogRegisterData(&status, sizeof(bool));
257 XLogFlush(recptr);
258}
259
260/*
261 * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
262 * the shared flags to revert the logical decoding activation process.
263 */
264static void
266{
269
270 elog(DEBUG1, "aborting logical decoding activation process");
271
272 /*
273 * Abort the change to xlog_logical_info. We don't need to check
274 * CheckLogicalSlotExists() as we're still holding a logical slot.
275 */
276 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
278 LWLockRelease(LogicalDecodingControlLock);
279
280 /*
281 * Some processes might have already started logical info WAL logging, so
282 * tell all running processes to update their caches. We don't need to
283 * wait for all processes to disable xlog_logical_info locally as it's
284 * always safe to write logical information to WAL records, even when not
285 * strictly required.
286 */
288}
289
290/*
291 * Enable logical decoding if disabled.
292 *
293 * If this function is called during recovery, it simply returns without
294 * action since the logical decoding status change is not allowed during
295 * this time. The logical decoding status depends on the status on the primary.
296 * The caller should use CheckLogicalDecodingRequirements() before calling this
297 * function to make sure that the logical decoding status can be modified.
298 *
299 * Note that there is no interlock between logical decoding activation
300 * and slot creation. To ensure enabling logical decoding, the caller
301 * needs to call this function after creating a logical slot before
302 * initializing the logical decoding context.
303 */
304void
306{
309
310 /* Logical decoding is always enabled */
312 return;
313
314 if (RecoveryInProgress())
315 {
316 /*
317 * CheckLogicalDecodingRequirements() must have already errored out if
318 * logical decoding is not enabled since we cannot enable the logical
319 * decoding status during recovery.
320 */
322 return;
323 }
324
325 /*
326 * Ensure to abort the activation process in cases where there in an
327 * interruption during the wait.
328 */
330 {
332 }
334}
335
336/*
337 * A workhorse function to enable logical decoding.
338 */
339void
341{
342 bool in_recovery;
343
344 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
345
346 /* Return if it is already enabled */
348 {
350 LWLockRelease(LogicalDecodingControlLock);
351 return;
352 }
353
354 /*
355 * Set logical info WAL logging in shmem. All process starts after this
356 * point will include the information required by logical decoding to WAL
357 * records.
358 */
360
361 LWLockRelease(LogicalDecodingControlLock);
362
363 /*
364 * Tell all running processes to reflect the xlog_logical_info update, and
365 * wait. This ensures that all running processes have enabled logical
366 * information WAL logging.
367 */
370
371 INJECTION_POINT("logical-decoding-activation", NULL);
372
373 in_recovery = RecoveryInProgress();
374
375 /*
376 * There could be some transactions that might have started with the old
377 * status, but we don't need to wait for these transactions to complete as
378 * long as they have valid XIDs. These transactions will appear in the
379 * xl_running_xacts record and therefore the snapshot builder will not try
380 * to decode the transaction during the logical decoding initialization.
381 *
382 * There is a theoretical case where a transaction decides whether to
383 * include logical-info to WAL records before getting an XID. In this
384 * case, the transaction won't appear in xl_running_xacts.
385 *
386 * For operations that do not require an XID assignment, the process
387 * starts including logical-info immediately upon receiving the signal
388 * (barrier). If such an operation checks the effective_wal_level multiple
389 * times within a single execution, the resulting WAL records might be
390 * inconsistent (i.e., logical-info is included in some records but not in
391 * others). However, this is harmless because logical decoding generally
392 * ignores WAL records that are not associated with an assigned XID.
393 *
394 * One might think we need to wait for all running transactions, including
395 * those without XIDs and read-only transactions, to finish before
396 * enabling logical decoding. However, such a requirement would force the
397 * slot creation to wait for a potentially very long time due to
398 * long-running read queries, which is practically unacceptable.
399 */
400
402
403 /*
404 * We enable logical decoding first, followed by writing the WAL record.
405 * This sequence ensures logical decoding becomes available on the primary
406 * first.
407 */
408 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
409
411
412 if (!in_recovery)
414
416
417 LWLockRelease(LogicalDecodingControlLock);
418
420
421 if (!in_recovery)
422 ereport(LOG,
423 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
424}
425
426/*
427 * Initiate a request for disabling logical decoding.
428 *
429 * Note that this function does not verify whether logical slots exist. The
430 * checkpointer will verify if logical decoding should actually be disabled.
431 */
432void
434{
436 return;
437
438 /*
439 * It's possible that we might not actually need to disable logical
440 * decoding if someone creates a new logical slot concurrently. We set the
441 * flag anyway and the checkpointer will check it and disable logical
442 * decoding if necessary.
443 */
444 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
446 LWLockRelease(LogicalDecodingControlLock);
447
449
450 elog(DEBUG1, "requested disabling logical decoding");
451}
452
453/*
454 * Disable logical decoding if necessary.
455 *
456 * This function disables logical decoding upon a request initiated by
457 * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
458 */
459void
461{
462 bool pending_disable;
463
465 return;
466
467 /*
468 * Sanity check as we cannot disable logical decoding while holding a
469 * logical slot.
470 */
472
473 if (RecoveryInProgress())
474 return;
475
476 LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
477 pending_disable = LogicalDecodingCtl->pending_disable;
478 LWLockRelease(LogicalDecodingControlLock);
479
480 /* Quick return if no pending disable request */
481 if (!pending_disable)
482 return;
483
485}
486
487/*
488 * A workhorse function to disable logical decoding.
489 */
490void
492{
493 bool in_recovery = RecoveryInProgress();
494
495 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
496
497 /*
498 * Check if we can disable logical decoding.
499 *
500 * Skip CheckLogicalSlotExists() check during recovery because the
501 * existing slots will be invalidated after disabling logical decoding.
502 */
504 (!in_recovery && CheckLogicalSlotExists()))
505 {
507 LWLockRelease(LogicalDecodingControlLock);
508 return;
509 }
510
512
513 /*
514 * We need to disable logical decoding first and then disable logical
515 * information WAL logging in order to ensure that no logical decoding
516 * processes WAL records with insufficient information.
517 */
519
520 /* Write the WAL to disable logical decoding on standbys too */
521 if (!in_recovery)
523
524 /* Now disable logical information WAL logging */
527
529
530 if (!in_recovery)
531 ereport(LOG,
532 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
533
534 LWLockRelease(LogicalDecodingControlLock);
535
536 /*
537 * Tell all running processes to reflect the xlog_logical_info update.
538 * Unlike when enabling logical decoding, we don't need to wait for all
539 * processes to complete it in this case. We already disabled logical
540 * decoding and it's always safe to write logical information to WAL
541 * records, even when not strictly required. Therefore, we don't need to
542 * wait for all running transactions to finish either.
543 */
545}
546
547/*
548 * Updates the logical decoding status at end of recovery, and ensures that
549 * all running processes have the updated XLogLogicalInfo status. This
550 * function must be called before accepting writes.
551 */
552void
554{
555 bool new_status = false;
556
558
559 /*
560 * With 'minimal' WAL level, there are no logical replication slots during
561 * recovery. Logical decoding is always disabled, so there is no need to
562 * synchronize XLogLogicalInfo.
563 */
565 {
567 return;
568 }
569
570 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
571
573 new_status = true;
574
575 /*
576 * When recovery ends, we need to either enable or disable logical
577 * decoding based on the wal_level setting and the presence of logical
578 * slots. We need to note that concurrent slot creation and deletion could
579 * happen but WAL writes are still not permitted until recovery fully
580 * completes. Here's how we handle concurrent toggling of logical
581 * decoding:
582 *
583 * For 'enable' case, if there's a concurrent disable request before
584 * recovery fully completes, the checkpointer will handle it after
585 * recovery is done. This means there might be a brief period after
586 * recovery where logical decoding remains enabled even with no logical
587 * replication slots present. This temporary state is not new - it can
588 * already occur due to the checkpointer's asynchronous deactivation
589 * process.
590 *
591 * For 'disable' case, backend cannot create logical replication slots
592 * during recovery (see checks in CheckLogicalDecodingRequirements()),
593 * which prevents a race condition between disabling logical decoding and
594 * concurrent slot creation.
595 */
597 {
598 /*
599 * Update both the logical decoding status and logical WAL logging
600 * status. Unlike toggling these status during non-recovery, we don't
601 * need to worry about the operation order as WAL writes are still not
602 * permitted.
603 */
606
607 elog(DEBUG1,
608 "update logical decoding status to %d at the end of recovery",
609 new_status);
610
611 /*
612 * Now that we updated the logical decoding status, clear the pending
613 * disable flag. It's possible that a concurrent process drops the
614 * last logical slot and initiates the pending disable again. The
615 * checkpointer process will check it.
616 */
618
619 LWLockRelease(LogicalDecodingControlLock);
620
622 }
623 else
624 LWLockRelease(LogicalDecodingControlLock);
625
626 /*
627 * Ensure all running processes have the updated status. We don't need to
628 * wait for running transactions to finish as we don't accept any writes
629 * yet. On the other hand, we need to wait for synchronizing
630 * XLogLogicalInfo even if we've not updated the status above as the
631 * status have been turned on and off during recovery, having running
632 * processes have different status on their local caches.
633 */
637
638 INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
639}
#define MemSet(start, val, len)
Definition: c.h:1019
size_t Size
Definition: c.h:625
void WakeupCheckpointer(void)
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
bool IsUnderPostmaster
Definition: globals.c:120
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
void InitializeProcessXLogLogicalInfo(void)
Definition: logicalctl.c:176
void LogicalDecodingCtlShmemInit(void)
Definition: logicalctl.c:129
void UpdateLogicalDecodingStatusEndOfRecovery(void)
Definition: logicalctl.c:553
bool IsLogicalDecodingEnabled(void)
Definition: logicalctl.c:204
void EnsureLogicalDecodingEnabled(void)
Definition: logicalctl.c:305
Size LogicalDecodingCtlShmemSize(void)
Definition: logicalctl.c:123
static void abort_logical_decoding_activation(int code, Datum arg)
Definition: logicalctl.c:265
bool ProcessBarrierUpdateXLogLogicalInfo(void)
Definition: logicalctl.c:186
bool XLogLogicalInfo
Definition: logicalctl.c:107
static bool XLogLogicalInfoUpdatePending
Definition: logicalctl.c:116
void AtEOXact_LogicalCtl(void)
Definition: logicalctl.c:235
static void update_xlog_logical_info(void)
Definition: logicalctl.c:166
struct LogicalDecodingCtlData LogicalDecodingCtlData
bool IsXLogLogicalInfoEnabled(void)
Definition: logicalctl.c:220
void StartupLogicalDecodingStatus(bool last_status)
Definition: logicalctl.c:146
void DisableLogicalDecoding(void)
Definition: logicalctl.c:491
static LogicalDecodingCtlData * LogicalDecodingCtl
Definition: logicalctl.c:98
void EnableLogicalDecoding(void)
Definition: logicalctl.c:340
void RequestDisableLogicalDecoding(void)
Definition: logicalctl.c:433
static void write_logical_decoding_status_update_record(bool status)
Definition: logicalctl.c:250
void DisableLogicalDecodingIfNecessary(void)
Definition: logicalctl.c:460
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_SHARED
Definition: lwlock.h:113
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
void * arg
#define XLOG_LOGICAL_DECODING_STATUS_CHANGE
Definition: pg_control.h:84
uint64_t Datum
Definition: postgres.h:70
void WaitForProcSignalBarrier(uint64 generation)
Definition: procsignal.c:424
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition: procsignal.c:356
@ PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO
Definition: procsignal.h:57
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
bool CheckLogicalSlotExists(void)
Definition: slot.c:1612
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
#define InvalidTransactionId
Definition: transam.h:31
TransactionId GetTopTransactionIdIfAny(void)
Definition: xact.c:442
bool RecoveryInProgress(void)
Definition: xlog.c:6461
int wal_level
Definition: xlog.c:134
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2784
@ WAL_LEVEL_REPLICA
Definition: xlog.h:76
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:77
@ WAL_LEVEL_MINIMAL
Definition: xlog.h:75
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:368
void XLogBeginInsert(void)
Definition: xloginsert.c:152