PostgreSQL Source Code git master
Loading...
Searching...
No Matches
logicalctl.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * logicalctl.c
3 * Functionality to control logical decoding status online.
4 *
5 * This module enables dynamic control of logical decoding availability.
6 * Logical decoding becomes active under two conditions: when the wal_level
7 * parameter is set to 'logical', or when at least one valid logical replication
8 * slot exists with wal_level set to 'replica'. The system disables logical
9 * decoding when neither condition is met. Therefore, the dynamic control
10 * of logical decoding availability is required only when wal_level is set
11 * to 'replica'. Logical decoding is always enabled when wal_level='logical'
12 * and always disabled when wal_level='minimal'.
13 *
14 * The core concept of dynamically enabling and disabling logical decoding
15 * is to separately control two aspects: writing information required for
16 * logical decoding to WAL records, and using logical decoding itself. During
17 * activation, we first enable logical WAL writing while keeping logical
18 * decoding disabled. This change is reflected in the read-only
19 * effective_wal_level GUC parameter. Once we ensure that all processes have
20 * updated to the latest effective_wal_level value, we then enable logical
21 * decoding. Deactivation follows a similar careful, multi-step process
22 * in reverse order.
23 *
24 * While activation occurs synchronously right after creating the first
25 * logical slot, deactivation happens asynchronously through the checkpointer
26 * process. This design avoids a race condition at the end of recovery; see
27 * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
28 * Asynchronous deactivation also avoids excessive toggling of the logical
29 * decoding status in workloads that repeatedly create and drop a single
30 * logical slot. On the other hand, this lazy approach can delay changes
31 * to effective_wal_level and the disabling logical decoding, especially
32 * when the checkpointer is busy with other tasks. We chose this lazy approach
33 * in all deactivation paths to keep the implementation simple, even though
34 * laziness is strictly required only for end-of-recovery cases. Future work
35 * might address this limitation either by using a dedicated worker instead
36 * of the checkpointer, or by implementing synchronous waiting during slot
37 * drops if workloads are significantly affected by the lazy deactivation
38 * of logical decoding.
39 *
40 * Standby servers use the primary server's effective_wal_level and logical
41 * decoding status. Unlike normal activation and deactivation, these
42 * are updated simultaneously without status change coordination, solely by
43 * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
44 * setting has no effect during this time. Upon promotion, we update the
45 * logical decoding status based on local conditions: the wal_level value and
46 * the presence of logical slots.
47 *
48 * In the future, we could extend support to include automatic transitions
49 * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
50 * this enhancement would require additional coordination mechanisms and
51 * careful implementation of operations such as terminating walsenders and
52 * archiver processes while carefully considering the sequence of operations
53 * to ensure system stability during these transitions.
54 *
55 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
56 * Portions Copyright (c) 1994, Regents of the University of California
57 *
58 * IDENTIFICATION
59 * src/backend/replication/logical/logicalctl.c
60 *
61 *-------------------------------------------------------------------------
62 */
63
64#include "postgres.h"
65
66#include "access/xloginsert.h"
67#include "catalog/pg_control.h"
68#include "miscadmin.h"
69#include "replication/slot.h"
70#include "storage/ipc.h"
71#include "storage/lmgr.h"
72#include "storage/proc.h"
73#include "storage/procarray.h"
74#include "storage/procsignal.h"
75#include "storage/subsystems.h"
77
78/*
79 * Struct for controlling the logical decoding status.
80 *
81 * This struct is protected by LogicalDecodingControlLock.
82 */
84{
85 /*
86 * This is the authoritative value used by all processes to determine
87 * whether to write additional information required by logical decoding to
88 * WAL. Since this information could be checked frequently, each process
89 * caches this value in XLogLogicalInfo for better performance.
90 */
92
93 /* True if logical decoding is available in the system */
95
96 /* True if logical decoding might need to be disabled */
99
101
102static void LogicalDecodingCtlShmemRequest(void *arg);
103
107
108/*
109 * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
110 * initialized at process startup, and updated when processing the process
111 * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
112 * is in an XID-assigned transaction, the cache update is delayed until the
113 * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
114 */
115bool XLogLogicalInfo = false;
116
117/*
118 * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
119 * an XID is assigned to the current transaction, the process sets this flag and
120 * delays the XLogLogicalInfo update until the transaction ends. This ensures
121 * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
122 * remains consistent throughout the transaction.
123 */
125
126static void update_xlog_logical_info(void);
127static void abort_logical_decoding_activation(int code, Datum arg);
128static void write_logical_decoding_status_update_record(bool status);
129
130static void
132{
133 ShmemRequestStruct(.name = "Logical decoding control",
134 .size = sizeof(LogicalDecodingCtlData),
135 .ptr = (void **) &LogicalDecodingCtl,
136 );
137}
138
139/*
140 * Initialize the logical decoding status in shmem at server startup. This
141 * must be called ONCE during postmaster or standalone-backend startup.
142 */
143void
145{
146 /* Logical decoding is always disabled when 'minimal' WAL level */
148 return;
149
150 /*
151 * Set the initial logical decoding status based on the last status. If
152 * logical decoding was enabled before the last shutdown, it remains
153 * enabled as we might have set wal_level='logical' or have at least one
154 * logical slot.
155 */
158}
159
160/*
161 * Update the XLogLogicalInfo cache.
162 */
163static inline void
168
169/*
170 * Initialize XLogLogicalInfo backend-private cache. This routine is called
171 * during process initialization.
172 */
173void
178
179/*
180 * This routine is called when we are told to update XLogLogicalInfo
181 * by a ProcSignalBarrier.
182 */
183bool
185{
187 {
188 /* Delay updating XLogLogicalInfo until the transaction end */
190 }
191 else
193
194 return true;
195}
196
197/*
198 * Check the shared memory state and return true if logical decoding is
199 * enabled on the system.
200 */
201bool
212
213/*
214 * Returns true if logical WAL logging is enabled based on the shared memory
215 * status.
216 */
217bool
219{
220 bool xlog_logical_info;
221
223 xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
225
226 return xlog_logical_info;
227}
228
229/*
230 * Reset the local cache at end of the transaction.
231 */
232void
234{
235 /* Update the local cache if there is a pending update */
237 {
240 }
241}
242
243/*
244 * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
245 * status.
246 */
247static void
257
258/*
259 * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
260 * the shared flags to revert the logical decoding activation process.
261 */
262static void
264{
267
268 elog(DEBUG1, "aborting logical decoding activation process");
269
270 /*
271 * Abort the change to xlog_logical_info. We don't need to check
272 * CheckLogicalSlotExists() as we're still holding a logical slot.
273 */
277
278 /*
279 * Some processes might have already started logical info WAL logging, so
280 * tell all running processes to update their caches. We don't need to
281 * wait for all processes to disable xlog_logical_info locally as it's
282 * always safe to write logical information to WAL records, even when not
283 * strictly required.
284 */
286}
287
288/*
289 * Enable logical decoding if disabled.
290 *
291 * If this function is called during recovery, it simply returns without
292 * action since the logical decoding status change is not allowed during
293 * this time. The logical decoding status depends on the status on the primary.
294 * The caller should use CheckLogicalDecodingRequirements() before calling this
295 * function to make sure that the logical decoding status can be modified.
296 *
297 * Note that there is no interlock between logical decoding activation
298 * and slot creation. To ensure enabling logical decoding, the caller
299 * needs to call this function after creating a logical slot before
300 * initializing the logical decoding context.
301 */
302void
304{
307
308 /* Logical decoding is always enabled */
310 return;
311
312 if (RecoveryInProgress())
313 {
314 /*
315 * CheckLogicalDecodingRequirements() must have already errored out if
316 * logical decoding is not enabled since we cannot enable the logical
317 * decoding status during recovery.
318 */
320 return;
321 }
322
323 /*
324 * Ensure to abort the activation process in cases where there in an
325 * interruption during the wait.
326 */
328 {
330 }
332}
333
334/*
335 * A workhorse function to enable logical decoding.
336 */
337void
339{
340 bool in_recovery;
341
343
344 /* Return if it is already enabled */
346 {
349 return;
350 }
351
352 /*
353 * Set logical info WAL logging in shmem. All process starts after this
354 * point will include the information required by logical decoding to WAL
355 * records.
356 */
358
360
361 /*
362 * Tell all running processes to reflect the xlog_logical_info update, and
363 * wait. This ensures that all running processes have enabled logical
364 * information WAL logging.
365 */
368
369 INJECTION_POINT("logical-decoding-activation", NULL);
370
372
373 /*
374 * There could be some transactions that might have started with the old
375 * status, but we don't need to wait for these transactions to complete as
376 * long as they have valid XIDs. These transactions will appear in the
377 * xl_running_xacts record and therefore the snapshot builder will not try
378 * to decode the transaction during the logical decoding initialization.
379 *
380 * There is a theoretical case where a transaction decides whether to
381 * include logical-info to WAL records before getting an XID. In this
382 * case, the transaction won't appear in xl_running_xacts.
383 *
384 * For operations that do not require an XID assignment, the process
385 * starts including logical-info immediately upon receiving the signal
386 * (barrier). If such an operation checks the effective_wal_level multiple
387 * times within a single execution, the resulting WAL records might be
388 * inconsistent (i.e., logical-info is included in some records but not in
389 * others). However, this is harmless because logical decoding generally
390 * ignores WAL records that are not associated with an assigned XID.
391 *
392 * One might think we need to wait for all running transactions, including
393 * those without XIDs and read-only transactions, to finish before
394 * enabling logical decoding. However, such a requirement would force the
395 * slot creation to wait for a potentially very long time due to
396 * long-running read queries, which is practically unacceptable.
397 */
398
400
401 /*
402 * We enable logical decoding first, followed by writing the WAL record.
403 * This sequence ensures logical decoding becomes available on the primary
404 * first.
405 */
407
409
410 if (!in_recovery)
412
414
416
418
419 if (!in_recovery)
420 ereport(LOG,
421 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
422}
423
424/*
425 * Initiate a request for disabling logical decoding.
426 *
427 * Note that this function does not verify whether logical slots exist. The
428 * checkpointer will verify if logical decoding should actually be disabled.
429 */
430void
432{
434 return;
435
436 /*
437 * It's possible that we might not actually need to disable logical
438 * decoding if someone creates a new logical slot concurrently. We set the
439 * flag anyway and the checkpointer will check it and disable logical
440 * decoding if necessary.
441 */
445
447
448 elog(DEBUG1, "requested disabling logical decoding");
449}
450
451/*
452 * Disable logical decoding if necessary.
453 *
454 * This function disables logical decoding upon a request initiated by
455 * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
456 */
457void
459{
460 bool pending_disable;
461
463 return;
464
465 /*
466 * Sanity check as we cannot disable logical decoding while holding a
467 * logical slot.
468 */
470
471 if (RecoveryInProgress())
472 return;
473
475 pending_disable = LogicalDecodingCtl->pending_disable;
477
478 /* Quick return if no pending disable request */
479 if (!pending_disable)
480 return;
481
483}
484
485/*
486 * A workhorse function to disable logical decoding.
487 */
488void
490{
492
494
495 /*
496 * Check if we can disable logical decoding.
497 *
498 * Skip CheckLogicalSlotExists() check during recovery because the
499 * existing slots will be invalidated after disabling logical decoding.
500 */
503 {
506 return;
507 }
508
510
511 /*
512 * We need to disable logical decoding first and then disable logical
513 * information WAL logging in order to ensure that no logical decoding
514 * processes WAL records with insufficient information.
515 */
517
518 /* Write the WAL to disable logical decoding on standbys too */
519 if (!in_recovery)
521
522 /* Now disable logical information WAL logging */
525
527
528 if (!in_recovery)
529 ereport(LOG,
530 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
531
533
534 /*
535 * Tell all running processes to reflect the xlog_logical_info update.
536 * Unlike when enabling logical decoding, we don't need to wait for all
537 * processes to complete it in this case. We already disabled logical
538 * decoding and it's always safe to write logical information to WAL
539 * records, even when not strictly required. Therefore, we don't need to
540 * wait for all running transactions to finish either.
541 */
543}
544
545/*
546 * Updates the logical decoding status at end of recovery, and ensures that
547 * all running processes have the updated XLogLogicalInfo status. This
548 * function must be called before accepting writes.
549 */
550void
552{
553 bool new_status = false;
554
556
557 /*
558 * With 'minimal' WAL level, there are no logical replication slots during
559 * recovery. Logical decoding is always disabled, so there is no need to
560 * synchronize XLogLogicalInfo.
561 */
563 {
565 return;
566 }
567
569
571 new_status = true;
572
573 /*
574 * When recovery ends, we need to either enable or disable logical
575 * decoding based on the wal_level setting and the presence of logical
576 * slots. We need to note that concurrent slot creation and deletion could
577 * happen but WAL writes are still not permitted until recovery fully
578 * completes. Here's how we handle concurrent toggling of logical
579 * decoding:
580 *
581 * For 'enable' case, if there's a concurrent disable request before
582 * recovery fully completes, the checkpointer will handle it after
583 * recovery is done. This means there might be a brief period after
584 * recovery where logical decoding remains enabled even with no logical
585 * replication slots present. This temporary state is not new - it can
586 * already occur due to the checkpointer's asynchronous deactivation
587 * process.
588 *
589 * For 'disable' case, backend cannot create logical replication slots
590 * during recovery (see checks in CheckLogicalDecodingRequirements()),
591 * which prevents a race condition between disabling logical decoding and
592 * concurrent slot creation.
593 */
595 {
596 /*
597 * Update both the logical decoding status and logical WAL logging
598 * status. Unlike toggling these status during non-recovery, we don't
599 * need to worry about the operation order as WAL writes are still not
600 * permitted.
601 */
604
605 elog(DEBUG1,
606 "update logical decoding status to %d at the end of recovery",
607 new_status);
608
609 /*
610 * Now that we updated the logical decoding status, clear the pending
611 * disable flag. It's possible that a concurrent process drops the
612 * last logical slot and initiates the pending disable again. The
613 * checkpointer process will check it.
614 */
616
618
620 }
621 else
623
624 /*
625 * Ensure all running processes have the updated status. We don't need to
626 * wait for running transactions to finish as we don't accept any writes
627 * yet. On the other hand, we need to wait for synchronizing
628 * XLogLogicalInfo even if we've not updated the status above as the
629 * status have been turned on and off during recovery, having running
630 * processes have different status on their local caches.
631 */
635
636 INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
637}
#define Assert(condition)
Definition c.h:943
void WakeupCheckpointer(void)
Datum arg
Definition elog.c:1322
#define LOG
Definition elog.h:32
#define DEBUG1
Definition elog.h:31
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
bool IsUnderPostmaster
Definition globals.c:122
#define INJECTION_POINT(name, arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:52
void InitializeProcessXLogLogicalInfo(void)
Definition logicalctl.c:174
void UpdateLogicalDecodingStatusEndOfRecovery(void)
Definition logicalctl.c:551
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
static void abort_logical_decoding_activation(int code, Datum arg)
Definition logicalctl.c:263
bool ProcessBarrierUpdateXLogLogicalInfo(void)
Definition logicalctl.c:184
bool XLogLogicalInfo
Definition logicalctl.c:115
static bool XLogLogicalInfoUpdatePending
Definition logicalctl.c:124
void AtEOXact_LogicalCtl(void)
Definition logicalctl.c:233
static void update_xlog_logical_info(void)
Definition logicalctl.c:164
bool IsXLogLogicalInfoEnabled(void)
Definition logicalctl.c:218
void StartupLogicalDecodingStatus(bool last_status)
Definition logicalctl.c:144
void DisableLogicalDecoding(void)
Definition logicalctl.c:489
static LogicalDecodingCtlData * LogicalDecodingCtl
Definition logicalctl.c:100
void EnableLogicalDecoding(void)
Definition logicalctl.c:338
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:431
const ShmemCallbacks LogicalDecodingCtlShmemCallbacks
Definition logicalctl.c:104
static void write_logical_decoding_status_update_record(bool status)
Definition logicalctl.c:248
void DisableLogicalDecodingIfNecessary(void)
Definition logicalctl.c:458
static void LogicalDecodingCtlShmemRequest(void *arg)
Definition logicalctl.c:131
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define END_CRIT_SECTION()
Definition miscadmin.h:154
static char * errmsg
#define XLOG_LOGICAL_DECODING_STATUS_CHANGE
Definition pg_control.h:87
uint64_t Datum
Definition postgres.h:70
static int fb(int x)
void WaitForProcSignalBarrier(uint64 generation)
Definition procsignal.c:428
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition procsignal.c:360
@ PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO
Definition procsignal.h:51
#define ShmemRequestStruct(...)
Definition shmem.h:176
bool CheckLogicalSlotExists(void)
Definition slot.c:1623
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
ShmemRequestCallback request_fn
Definition shmem.h:133
#define InvalidTransactionId
Definition transam.h:31
const char * name
TransactionId GetTopTransactionIdIfAny(void)
Definition xact.c:443
bool RecoveryInProgress(void)
Definition xlog.c:6830
int wal_level
Definition xlog.c:138
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
@ WAL_LEVEL_LOGICAL
Definition xlog.h:78
@ WAL_LEVEL_MINIMAL
Definition xlog.h:76
uint64 XLogRecPtr
Definition xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153