PostgreSQL Source Code git master
logicalctl.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size LogicalDecodingCtlShmemSize (void)
 
void LogicalDecodingCtlShmemInit (void)
 
void StartupLogicalDecodingStatus (bool last_status)
 
void InitializeProcessXLogLogicalInfo (void)
 
bool ProcessBarrierUpdateXLogLogicalInfo (void)
 
bool IsLogicalDecodingEnabled (void)
 
bool IsXLogLogicalInfoEnabled (void)
 
void AtEOXact_LogicalCtl (void)
 
void EnsureLogicalDecodingEnabled (void)
 
void EnableLogicalDecoding (void)
 
void RequestDisableLogicalDecoding (void)
 
void DisableLogicalDecodingIfNecessary (void)
 
void DisableLogicalDecoding (void)
 
void UpdateLogicalDecodingStatusEndOfRecovery (void)
 

Function Documentation

◆ AtEOXact_LogicalCtl()

void AtEOXact_LogicalCtl ( void  )

Definition at line 235 of file logicalctl.c.

236{
237 /* Update the local cache if there is a pending update */
239 {
242 }
243}
static bool XLogLogicalInfoUpdatePending
Definition: logicalctl.c:116
static void update_xlog_logical_info(void)
Definition: logicalctl.c:166

References update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ DisableLogicalDecoding()

void DisableLogicalDecoding ( void  )

Definition at line 491 of file logicalctl.c.

492{
493 bool in_recovery = RecoveryInProgress();
494
495 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
496
497 /*
498 * Check if we can disable logical decoding.
499 *
500 * Skip CheckLogicalSlotExists() check during recovery because the
501 * existing slots will be invalidated after disabling logical decoding.
502 */
504 (!in_recovery && CheckLogicalSlotExists()))
505 {
507 LWLockRelease(LogicalDecodingControlLock);
508 return;
509 }
510
512
513 /*
514 * We need to disable logical decoding first and then disable logical
515 * information WAL logging in order to ensure that no logical decoding
516 * processes WAL records with insufficient information.
517 */
519
520 /* Write the WAL to disable logical decoding on standbys too */
521 if (!in_recovery)
523
524 /* Now disable logical information WAL logging */
527
529
530 if (!in_recovery)
531 ereport(LOG,
532 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
533
534 LWLockRelease(LogicalDecodingControlLock);
535
536 /*
537 * Tell all running processes to reflect the xlog_logical_info update.
538 * Unlike when enabling logical decoding, we don't need to wait for all
539 * processes to complete it in this case. We already disabled logical
540 * decoding and it's always safe to write logical information to WAL
541 * records, even when not strictly required. Therefore, we don't need to
542 * wait for all running transactions to finish either.
543 */
545}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define ereport(elevel,...)
Definition: elog.h:150
static LogicalDecodingCtlData * LogicalDecodingCtl
Definition: logicalctl.c:98
static void write_logical_decoding_status_update_record(bool status)
Definition: logicalctl.c:250
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition: procsignal.c:356
@ PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO
Definition: procsignal.h:57
bool CheckLogicalSlotExists(void)
Definition: slot.c:1612
bool RecoveryInProgress(void)
Definition: xlog.c:6461

References CheckLogicalSlotExists(), EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg(), LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by DisableLogicalDecodingIfNecessary(), and xlog_redo().

◆ DisableLogicalDecodingIfNecessary()

void DisableLogicalDecodingIfNecessary ( void  )

Definition at line 460 of file logicalctl.c.

461{
462 bool pending_disable;
463
465 return;
466
467 /*
468 * Sanity check as we cannot disable logical decoding while holding a
469 * logical slot.
470 */
472
473 if (RecoveryInProgress())
474 return;
475
476 LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
477 pending_disable = LogicalDecodingCtl->pending_disable;
478 LWLockRelease(LogicalDecodingControlLock);
479
480 /* Quick return if no pending disable request */
481 if (!pending_disable)
482 return;
483
485}
Assert(PointerIsAligned(start, uint64))
void DisableLogicalDecoding(void)
Definition: logicalctl.c:491
@ LW_SHARED
Definition: lwlock.h:113
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
int wal_level
Definition: xlog.c:134
@ WAL_LEVEL_REPLICA
Definition: xlog.h:76

References Assert(), DisableLogicalDecoding(), LogicalDecodingCtl, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyReplicationSlot, LogicalDecodingCtlData::pending_disable, RecoveryInProgress(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckpointerMain().

◆ EnableLogicalDecoding()

void EnableLogicalDecoding ( void  )

Definition at line 340 of file logicalctl.c.

341{
342 bool in_recovery;
343
344 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
345
346 /* Return if it is already enabled */
348 {
350 LWLockRelease(LogicalDecodingControlLock);
351 return;
352 }
353
354 /*
355 * Set logical info WAL logging in shmem. All process starts after this
356 * point will include the information required by logical decoding to WAL
357 * records.
358 */
360
361 LWLockRelease(LogicalDecodingControlLock);
362
363 /*
364 * Tell all running processes to reflect the xlog_logical_info update, and
365 * wait. This ensures that all running processes have enabled logical
366 * information WAL logging.
367 */
370
371 INJECTION_POINT("logical-decoding-activation", NULL);
372
373 in_recovery = RecoveryInProgress();
374
375 /*
376 * There could be some transactions that might have started with the old
377 * status, but we don't need to wait for these transactions to complete as
378 * long as they have valid XIDs. These transactions will appear in the
379 * xl_running_xacts record and therefore the snapshot builder will not try
380 * to decode the transaction during the logical decoding initialization.
381 *
382 * There is a theoretical case where a transaction decides whether to
383 * include logical-info to WAL records before getting an XID. In this
384 * case, the transaction won't appear in xl_running_xacts.
385 *
386 * For operations that do not require an XID assignment, the process
387 * starts including logical-info immediately upon receiving the signal
388 * (barrier). If such an operation checks the effective_wal_level multiple
389 * times within a single execution, the resulting WAL records might be
390 * inconsistent (i.e., logical-info is included in some records but not in
391 * others). However, this is harmless because logical decoding generally
392 * ignores WAL records that are not associated with an assigned XID.
393 *
394 * One might think we need to wait for all running transactions, including
395 * those without XIDs and read-only transactions, to finish before
396 * enabling logical decoding. However, such a requirement would force the
397 * slot creation to wait for a potentially very long time due to
398 * long-running read queries, which is practically unacceptable.
399 */
400
402
403 /*
404 * We enable logical decoding first, followed by writing the WAL record.
405 * This sequence ensures logical decoding becomes available on the primary
406 * first.
407 */
408 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
409
411
412 if (!in_recovery)
414
416
417 LWLockRelease(LogicalDecodingControlLock);
418
420
421 if (!in_recovery)
422 ereport(LOG,
423 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
424}
#define INJECTION_POINT(name, arg)
void WaitForProcSignalBarrier(uint64 generation)
Definition: procsignal.c:424

References EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg(), INJECTION_POINT, LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, WaitForProcSignalBarrier(), write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by EnsureLogicalDecodingEnabled(), and xlog_redo().

◆ EnsureLogicalDecodingEnabled()

void EnsureLogicalDecodingEnabled ( void  )

Definition at line 305 of file logicalctl.c.

306{
309
310 /* Logical decoding is always enabled */
312 return;
313
314 if (RecoveryInProgress())
315 {
316 /*
317 * CheckLogicalDecodingRequirements() must have already errored out if
318 * logical decoding is not enabled since we cannot enable the logical
319 * decoding status during recovery.
320 */
322 return;
323 }
324
325 /*
326 * Ensure to abort the activation process in cases where there in an
327 * interruption during the wait.
328 */
330 {
332 }
334}
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
bool IsLogicalDecodingEnabled(void)
Definition: logicalctl.c:204
static void abort_logical_decoding_activation(int code, Datum arg)
Definition: logicalctl.c:265
void EnableLogicalDecoding(void)
Definition: logicalctl.c:340
uint64_t Datum
Definition: postgres.h:70
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:77

References abort_logical_decoding_activation(), Assert(), EnableLogicalDecoding(), IsLogicalDecodingEnabled(), MyReplicationSlot, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, RecoveryInProgress(), wal_level, WAL_LEVEL_LOGICAL, and WAL_LEVEL_REPLICA.

Referenced by create_logical_replication_slot(), and CreateReplicationSlot().

◆ InitializeProcessXLogLogicalInfo()

void InitializeProcessXLogLogicalInfo ( void  )

Definition at line 176 of file logicalctl.c.

177{
179}

References update_xlog_logical_info().

Referenced by BaseInit().

◆ IsLogicalDecodingEnabled()

◆ IsXLogLogicalInfoEnabled()

bool IsXLogLogicalInfoEnabled ( void  )

Definition at line 220 of file logicalctl.c.

221{
222 bool xlog_logical_info;
223
224 LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
225 xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
226 LWLockRelease(LogicalDecodingControlLock);
227
228 return xlog_logical_info;
229}

References LogicalDecodingCtl, LW_SHARED, LWLockAcquire(), LWLockRelease(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by show_effective_wal_level(), update_xlog_logical_info(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ LogicalDecodingCtlShmemInit()

void LogicalDecodingCtlShmemInit ( void  )

Definition at line 129 of file logicalctl.c.

130{
131 bool found;
132
133 LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
135 &found);
136
137 if (!found)
139}
#define MemSet(start, val, len)
Definition: c.h:1019
Size LogicalDecodingCtlShmemSize(void)
Definition: logicalctl.c:123
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389

References LogicalDecodingCtl, LogicalDecodingCtlShmemSize(), MemSet, and ShmemInitStruct().

Referenced by CreateOrAttachShmemStructs().

◆ LogicalDecodingCtlShmemSize()

Size LogicalDecodingCtlShmemSize ( void  )

Definition at line 123 of file logicalctl.c.

124{
125 return sizeof(LogicalDecodingCtlData);
126}
struct LogicalDecodingCtlData LogicalDecodingCtlData

Referenced by CalculateShmemSize(), and LogicalDecodingCtlShmemInit().

◆ ProcessBarrierUpdateXLogLogicalInfo()

bool ProcessBarrierUpdateXLogLogicalInfo ( void  )

Definition at line 186 of file logicalctl.c.

187{
189 {
190 /* Delay updating XLogLogicalInfo until the transaction end */
192 }
193 else
195
196 return true;
197}
#define InvalidTransactionId
Definition: transam.h:31
TransactionId GetTopTransactionIdIfAny(void)
Definition: xact.c:442

References GetTopTransactionIdIfAny(), InvalidTransactionId, update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by ProcessProcSignalBarrier().

◆ RequestDisableLogicalDecoding()

void RequestDisableLogicalDecoding ( void  )

Definition at line 433 of file logicalctl.c.

434{
436 return;
437
438 /*
439 * It's possible that we might not actually need to disable logical
440 * decoding if someone creates a new logical slot concurrently. We set the
441 * flag anyway and the checkpointer will check it and disable logical
442 * decoding if necessary.
443 */
444 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
446 LWLockRelease(LogicalDecodingControlLock);
447
449
450 elog(DEBUG1, "requested disabling logical decoding");
451}
void WakeupCheckpointer(void)
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226

References DEBUG1, elog, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, WakeupCheckpointer(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by InvalidateObsoleteReplicationSlots(), ReplicationSlotCleanup(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ StartupLogicalDecodingStatus()

void StartupLogicalDecodingStatus ( bool  last_status)

Definition at line 146 of file logicalctl.c.

147{
148 /* Logical decoding is always disabled when 'minimal' WAL level */
150 return;
151
152 /*
153 * Set the initial logical decoding status based on the last status. If
154 * logical decoding was enabled before the last shutdown, it remains
155 * enabled as we might have set wal_level='logical' or have at least one
156 * logical slot.
157 */
160}
@ WAL_LEVEL_MINIMAL
Definition: xlog.h:75

References LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, wal_level, WAL_LEVEL_MINIMAL, and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().

◆ UpdateLogicalDecodingStatusEndOfRecovery()

void UpdateLogicalDecodingStatusEndOfRecovery ( void  )

Definition at line 553 of file logicalctl.c.

554{
555 bool new_status = false;
556
558
559 /*
560 * With 'minimal' WAL level, there are no logical replication slots during
561 * recovery. Logical decoding is always disabled, so there is no need to
562 * synchronize XLogLogicalInfo.
563 */
565 {
567 return;
568 }
569
570 LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
571
573 new_status = true;
574
575 /*
576 * When recovery ends, we need to either enable or disable logical
577 * decoding based on the wal_level setting and the presence of logical
578 * slots. We need to note that concurrent slot creation and deletion could
579 * happen but WAL writes are still not permitted until recovery fully
580 * completes. Here's how we handle concurrent toggling of logical
581 * decoding:
582 *
583 * For 'enable' case, if there's a concurrent disable request before
584 * recovery fully completes, the checkpointer will handle it after
585 * recovery is done. This means there might be a brief period after
586 * recovery where logical decoding remains enabled even with no logical
587 * replication slots present. This temporary state is not new - it can
588 * already occur due to the checkpointer's asynchronous deactivation
589 * process.
590 *
591 * For 'disable' case, backend cannot create logical replication slots
592 * during recovery (see checks in CheckLogicalDecodingRequirements()),
593 * which prevents a race condition between disabling logical decoding and
594 * concurrent slot creation.
595 */
597 {
598 /*
599 * Update both the logical decoding status and logical WAL logging
600 * status. Unlike toggling these status during non-recovery, we don't
601 * need to worry about the operation order as WAL writes are still not
602 * permitted.
603 */
606
607 elog(DEBUG1,
608 "update logical decoding status to %d at the end of recovery",
609 new_status);
610
611 /*
612 * Now that we updated the logical decoding status, clear the pending
613 * disable flag. It's possible that a concurrent process drops the
614 * last logical slot and initiates the pending disable again. The
615 * checkpointer process will check it.
616 */
618
619 LWLockRelease(LogicalDecodingControlLock);
620
622 }
623 else
624 LWLockRelease(LogicalDecodingControlLock);
625
626 /*
627 * Ensure all running processes have the updated status. We don't need to
628 * wait for running transactions to finish as we don't accept any writes
629 * yet. On the other hand, we need to wait for synchronizing
630 * XLogLogicalInfo even if we've not updated the status above as the
631 * status have been turned on and off during recovery, having running
632 * processes have different status on their local caches.
633 */
637
638 INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
639}
bool IsUnderPostmaster
Definition: globals.c:120
bool IsXLogLogicalInfoEnabled(void)
Definition: logicalctl.c:220

References Assert(), CheckLogicalSlotExists(), DEBUG1, elog, EmitProcSignalBarrier(), INJECTION_POINT, IsLogicalDecodingEnabled(), IsUnderPostmaster, IsXLogLogicalInfoEnabled(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), WaitForProcSignalBarrier(), wal_level, WAL_LEVEL_LOGICAL, WAL_LEVEL_MINIMAL, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().