PostgreSQL Source Code git master
Loading...
Searching...
No Matches
logicalctl.c File Reference
#include "postgres.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "miscadmin.h"
#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/injection_point.h"
Include dependency graph for logicalctl.c:

Go to the source code of this file.

Data Structures

struct  LogicalDecodingCtlData
 

Typedefs

typedef struct LogicalDecodingCtlData LogicalDecodingCtlData
 

Functions

static void update_xlog_logical_info (void)
 
static void abort_logical_decoding_activation (int code, Datum arg)
 
static void write_logical_decoding_status_update_record (bool status)
 
Size LogicalDecodingCtlShmemSize (void)
 
void LogicalDecodingCtlShmemInit (void)
 
void StartupLogicalDecodingStatus (bool last_status)
 
void InitializeProcessXLogLogicalInfo (void)
 
bool ProcessBarrierUpdateXLogLogicalInfo (void)
 
bool IsLogicalDecodingEnabled (void)
 
bool IsXLogLogicalInfoEnabled (void)
 
void AtEOXact_LogicalCtl (void)
 
void EnsureLogicalDecodingEnabled (void)
 
void EnableLogicalDecoding (void)
 
void RequestDisableLogicalDecoding (void)
 
void DisableLogicalDecodingIfNecessary (void)
 
void DisableLogicalDecoding (void)
 
void UpdateLogicalDecodingStatusEndOfRecovery (void)
 

Variables

static LogicalDecodingCtlDataLogicalDecodingCtl = NULL
 
bool XLogLogicalInfo = false
 
static bool XLogLogicalInfoUpdatePending = false
 

Typedef Documentation

◆ LogicalDecodingCtlData

Function Documentation

◆ abort_logical_decoding_activation()

static void abort_logical_decoding_activation ( int  code,
Datum  arg 
)
static

Definition at line 266 of file logicalctl.c.

267{
270
271 elog(DEBUG1, "aborting logical decoding activation process");
272
273 /*
274 * Abort the change to xlog_logical_info. We don't need to check
275 * CheckLogicalSlotExists() as we're still holding a logical slot.
276 */
280
281 /*
282 * Some processes might have already started logical info WAL logging, so
283 * tell all running processes to update their caches. We don't need to
284 * wait for all processes to disable xlog_logical_info locally as it's
285 * always safe to write logical information to WAL records, even when not
286 * strictly required.
287 */
289}
#define Assert(condition)
Definition c.h:885
#define DEBUG1
Definition elog.h:30
#define elog(elevel,...)
Definition elog.h:226
static LogicalDecodingCtlData * LogicalDecodingCtl
Definition logicalctl.c:99
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
static int fb(int x)
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition procsignal.c:358
@ PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO
Definition procsignal.h:49
ReplicationSlot * MyReplicationSlot
Definition slot.c:148

References Assert, DEBUG1, elog, EmitProcSignalBarrier(), fb(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyReplicationSlot, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, and LogicalDecodingCtlData::xlog_logical_info.

Referenced by EnsureLogicalDecodingEnabled().

◆ AtEOXact_LogicalCtl()

void AtEOXact_LogicalCtl ( void  )

Definition at line 236 of file logicalctl.c.

237{
238 /* Update the local cache if there is a pending update */
240 {
243 }
244}
static bool XLogLogicalInfoUpdatePending
Definition logicalctl.c:117
static void update_xlog_logical_info(void)
Definition logicalctl.c:167

References update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ DisableLogicalDecoding()

void DisableLogicalDecoding ( void  )

Definition at line 492 of file logicalctl.c.

493{
495
497
498 /*
499 * Check if we can disable logical decoding.
500 *
501 * Skip CheckLogicalSlotExists() check during recovery because the
502 * existing slots will be invalidated after disabling logical decoding.
503 */
506 {
509 return;
510 }
511
513
514 /*
515 * We need to disable logical decoding first and then disable logical
516 * information WAL logging in order to ensure that no logical decoding
517 * processes WAL records with insufficient information.
518 */
520
521 /* Write the WAL to disable logical decoding on standbys too */
522 if (!in_recovery)
524
525 /* Now disable logical information WAL logging */
528
530
531 if (!in_recovery)
532 ereport(LOG,
533 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
534
536
537 /*
538 * Tell all running processes to reflect the xlog_logical_info update.
539 * Unlike when enabling logical decoding, we don't need to wait for all
540 * processes to complete it in this case. We already disabled logical
541 * decoding and it's always safe to write logical information to WAL
542 * records, even when not strictly required. Therefore, we don't need to
543 * wait for all running transactions to finish either.
544 */
546}
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define LOG
Definition elog.h:31
#define ereport(elevel,...)
Definition elog.h:150
static void write_logical_decoding_status_update_record(bool status)
Definition logicalctl.c:251
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define END_CRIT_SECTION()
Definition miscadmin.h:152
bool CheckLogicalSlotExists(void)
Definition slot.c:1615
bool RecoveryInProgress(void)
Definition xlog.c:6460

References CheckLogicalSlotExists(), EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg(), fb(), LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by DisableLogicalDecodingIfNecessary(), and xlog_redo().

◆ DisableLogicalDecodingIfNecessary()

void DisableLogicalDecodingIfNecessary ( void  )

Definition at line 461 of file logicalctl.c.

462{
463 bool pending_disable;
464
466 return;
467
468 /*
469 * Sanity check as we cannot disable logical decoding while holding a
470 * logical slot.
471 */
473
474 if (RecoveryInProgress())
475 return;
476
478 pending_disable = LogicalDecodingCtl->pending_disable;
480
481 /* Quick return if no pending disable request */
482 if (!pending_disable)
483 return;
484
486}
void DisableLogicalDecoding(void)
Definition logicalctl.c:492
@ LW_SHARED
Definition lwlock.h:113
int wal_level
Definition xlog.c:134
@ WAL_LEVEL_REPLICA
Definition xlog.h:76

References Assert, DisableLogicalDecoding(), fb(), LogicalDecodingCtl, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyReplicationSlot, LogicalDecodingCtlData::pending_disable, RecoveryInProgress(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckpointerMain().

◆ EnableLogicalDecoding()

void EnableLogicalDecoding ( void  )

Definition at line 341 of file logicalctl.c.

342{
343 bool in_recovery;
344
346
347 /* Return if it is already enabled */
349 {
352 return;
353 }
354
355 /*
356 * Set logical info WAL logging in shmem. All process starts after this
357 * point will include the information required by logical decoding to WAL
358 * records.
359 */
361
363
364 /*
365 * Tell all running processes to reflect the xlog_logical_info update, and
366 * wait. This ensures that all running processes have enabled logical
367 * information WAL logging.
368 */
371
372 INJECTION_POINT("logical-decoding-activation", NULL);
373
375
376 /*
377 * There could be some transactions that might have started with the old
378 * status, but we don't need to wait for these transactions to complete as
379 * long as they have valid XIDs. These transactions will appear in the
380 * xl_running_xacts record and therefore the snapshot builder will not try
381 * to decode the transaction during the logical decoding initialization.
382 *
383 * There is a theoretical case where a transaction decides whether to
384 * include logical-info to WAL records before getting an XID. In this
385 * case, the transaction won't appear in xl_running_xacts.
386 *
387 * For operations that do not require an XID assignment, the process
388 * starts including logical-info immediately upon receiving the signal
389 * (barrier). If such an operation checks the effective_wal_level multiple
390 * times within a single execution, the resulting WAL records might be
391 * inconsistent (i.e., logical-info is included in some records but not in
392 * others). However, this is harmless because logical decoding generally
393 * ignores WAL records that are not associated with an assigned XID.
394 *
395 * One might think we need to wait for all running transactions, including
396 * those without XIDs and read-only transactions, to finish before
397 * enabling logical decoding. However, such a requirement would force the
398 * slot creation to wait for a potentially very long time due to
399 * long-running read queries, which is practically unacceptable.
400 */
401
403
404 /*
405 * We enable logical decoding first, followed by writing the WAL record.
406 * This sequence ensures logical decoding becomes available on the primary
407 * first.
408 */
410
412
413 if (!in_recovery)
415
417
419
421
422 if (!in_recovery)
423 ereport(LOG,
424 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
425}
#define INJECTION_POINT(name, arg)
void WaitForProcSignalBarrier(uint64 generation)
Definition procsignal.c:426

References EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg(), fb(), INJECTION_POINT, LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, WaitForProcSignalBarrier(), write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by EnsureLogicalDecodingEnabled(), and xlog_redo().

◆ EnsureLogicalDecodingEnabled()

void EnsureLogicalDecodingEnabled ( void  )

Definition at line 306 of file logicalctl.c.

307{
310
311 /* Logical decoding is always enabled */
313 return;
314
315 if (RecoveryInProgress())
316 {
317 /*
318 * CheckLogicalDecodingRequirements() must have already errored out if
319 * logical decoding is not enabled since we cannot enable the logical
320 * decoding status during recovery.
321 */
323 return;
324 }
325
326 /*
327 * Ensure to abort the activation process in cases where there in an
328 * interruption during the wait.
329 */
331 {
333 }
335}
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:52
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:205
static void abort_logical_decoding_activation(int code, Datum arg)
Definition logicalctl.c:266
void EnableLogicalDecoding(void)
Definition logicalctl.c:341
uint64_t Datum
Definition postgres.h:70
@ WAL_LEVEL_LOGICAL
Definition xlog.h:77

References abort_logical_decoding_activation(), Assert, EnableLogicalDecoding(), IsLogicalDecodingEnabled(), MyReplicationSlot, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, RecoveryInProgress(), wal_level, WAL_LEVEL_LOGICAL, and WAL_LEVEL_REPLICA.

Referenced by create_logical_replication_slot(), and CreateReplicationSlot().

◆ InitializeProcessXLogLogicalInfo()

void InitializeProcessXLogLogicalInfo ( void  )

Definition at line 177 of file logicalctl.c.

178{
180}

References update_xlog_logical_info().

Referenced by BaseInit().

◆ IsLogicalDecodingEnabled()

◆ IsXLogLogicalInfoEnabled()

bool IsXLogLogicalInfoEnabled ( void  )

◆ LogicalDecodingCtlShmemInit()

void LogicalDecodingCtlShmemInit ( void  )

Definition at line 130 of file logicalctl.c.

131{
132 bool found;
133
134 LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
136 &found);
137
138 if (!found)
140}
#define MemSet(start, val, len)
Definition c.h:1035
Size LogicalDecodingCtlShmemSize(void)
Definition logicalctl.c:124
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378

References LogicalDecodingCtl, LogicalDecodingCtlShmemSize(), MemSet, and ShmemInitStruct().

Referenced by CreateOrAttachShmemStructs().

◆ LogicalDecodingCtlShmemSize()

Size LogicalDecodingCtlShmemSize ( void  )

Definition at line 124 of file logicalctl.c.

125{
126 return sizeof(LogicalDecodingCtlData);
127}

Referenced by CalculateShmemSize(), and LogicalDecodingCtlShmemInit().

◆ ProcessBarrierUpdateXLogLogicalInfo()

bool ProcessBarrierUpdateXLogLogicalInfo ( void  )

Definition at line 187 of file logicalctl.c.

188{
190 {
191 /* Delay updating XLogLogicalInfo until the transaction end */
193 }
194 else
196
197 return true;
198}
#define InvalidTransactionId
Definition transam.h:31
TransactionId GetTopTransactionIdIfAny(void)
Definition xact.c:442

References GetTopTransactionIdIfAny(), InvalidTransactionId, update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by ProcessProcSignalBarrier().

◆ RequestDisableLogicalDecoding()

void RequestDisableLogicalDecoding ( void  )

Definition at line 434 of file logicalctl.c.

435{
437 return;
438
439 /*
440 * It's possible that we might not actually need to disable logical
441 * decoding if someone creates a new logical slot concurrently. We set the
442 * flag anyway and the checkpointer will check it and disable logical
443 * decoding if necessary.
444 */
448
450
451 elog(DEBUG1, "requested disabling logical decoding");
452}
void WakeupCheckpointer(void)

References DEBUG1, elog, fb(), LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, WakeupCheckpointer(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by InvalidateObsoleteReplicationSlots(), ReplicationSlotCleanup(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ StartupLogicalDecodingStatus()

void StartupLogicalDecodingStatus ( bool  last_status)

Definition at line 147 of file logicalctl.c.

148{
149 /* Logical decoding is always disabled when 'minimal' WAL level */
151 return;
152
153 /*
154 * Set the initial logical decoding status based on the last status. If
155 * logical decoding was enabled before the last shutdown, it remains
156 * enabled as we might have set wal_level='logical' or have at least one
157 * logical slot.
158 */
161}
@ WAL_LEVEL_MINIMAL
Definition xlog.h:75

References fb(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, wal_level, WAL_LEVEL_MINIMAL, and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().

◆ update_xlog_logical_info()

static void update_xlog_logical_info ( void  )
inlinestatic

Definition at line 167 of file logicalctl.c.

168{
170}
bool XLogLogicalInfo
Definition logicalctl.c:108
bool IsXLogLogicalInfoEnabled(void)
Definition logicalctl.c:221

References IsXLogLogicalInfoEnabled(), and XLogLogicalInfo.

Referenced by AtEOXact_LogicalCtl(), InitializeProcessXLogLogicalInfo(), and ProcessBarrierUpdateXLogLogicalInfo().

◆ UpdateLogicalDecodingStatusEndOfRecovery()

void UpdateLogicalDecodingStatusEndOfRecovery ( void  )

Definition at line 554 of file logicalctl.c.

555{
556 bool new_status = false;
557
559
560 /*
561 * With 'minimal' WAL level, there are no logical replication slots during
562 * recovery. Logical decoding is always disabled, so there is no need to
563 * synchronize XLogLogicalInfo.
564 */
566 {
568 return;
569 }
570
572
574 new_status = true;
575
576 /*
577 * When recovery ends, we need to either enable or disable logical
578 * decoding based on the wal_level setting and the presence of logical
579 * slots. We need to note that concurrent slot creation and deletion could
580 * happen but WAL writes are still not permitted until recovery fully
581 * completes. Here's how we handle concurrent toggling of logical
582 * decoding:
583 *
584 * For 'enable' case, if there's a concurrent disable request before
585 * recovery fully completes, the checkpointer will handle it after
586 * recovery is done. This means there might be a brief period after
587 * recovery where logical decoding remains enabled even with no logical
588 * replication slots present. This temporary state is not new - it can
589 * already occur due to the checkpointer's asynchronous deactivation
590 * process.
591 *
592 * For 'disable' case, backend cannot create logical replication slots
593 * during recovery (see checks in CheckLogicalDecodingRequirements()),
594 * which prevents a race condition between disabling logical decoding and
595 * concurrent slot creation.
596 */
598 {
599 /*
600 * Update both the logical decoding status and logical WAL logging
601 * status. Unlike toggling these status during non-recovery, we don't
602 * need to worry about the operation order as WAL writes are still not
603 * permitted.
604 */
607
608 elog(DEBUG1,
609 "update logical decoding status to %d at the end of recovery",
610 new_status);
611
612 /*
613 * Now that we updated the logical decoding status, clear the pending
614 * disable flag. It's possible that a concurrent process drops the
615 * last logical slot and initiates the pending disable again. The
616 * checkpointer process will check it.
617 */
619
621
623 }
624 else
626
627 /*
628 * Ensure all running processes have the updated status. We don't need to
629 * wait for running transactions to finish as we don't accept any writes
630 * yet. On the other hand, we need to wait for synchronizing
631 * XLogLogicalInfo even if we've not updated the status above as the
632 * status have been turned on and off during recovery, having running
633 * processes have different status on their local caches.
634 */
638
639 INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
640}
bool IsUnderPostmaster
Definition globals.c:120

References Assert, CheckLogicalSlotExists(), DEBUG1, elog, EmitProcSignalBarrier(), fb(), INJECTION_POINT, IsLogicalDecodingEnabled(), IsUnderPostmaster, IsXLogLogicalInfoEnabled(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), WaitForProcSignalBarrier(), wal_level, WAL_LEVEL_LOGICAL, WAL_LEVEL_MINIMAL, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().

◆ write_logical_decoding_status_update_record()

static void write_logical_decoding_status_update_record ( bool  status)
static

Definition at line 251 of file logicalctl.c.

252{
254
256 XLogRegisterData(&status, sizeof(bool));
259}
#define XLOG_LOGICAL_DECODING_STATUS_CHANGE
Definition pg_control.h:84
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
uint64 XLogRecPtr
Definition xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152

References fb(), XLOG_LOGICAL_DECODING_STATUS_CHANGE, XLogBeginInsert(), XLogFlush(), XLogInsert(), and XLogRegisterData().

Referenced by DisableLogicalDecoding(), EnableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

Variable Documentation

◆ LogicalDecodingCtl

◆ XLogLogicalInfo

bool XLogLogicalInfo = false

Definition at line 108 of file logicalctl.c.

Referenced by update_xlog_logical_info().

◆ XLogLogicalInfoUpdatePending

bool XLogLogicalInfoUpdatePending = false
static

Definition at line 117 of file logicalctl.c.

Referenced by AtEOXact_LogicalCtl(), and ProcessBarrierUpdateXLogLogicalInfo().