PostgreSQL Source Code git master
Loading...
Searching...
No Matches
logicalctl.c File Reference
#include "postgres.h"
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "miscadmin.h"
#include "replication/slot.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/subsystems.h"
#include "utils/injection_point.h"
Include dependency graph for logicalctl.c:

Go to the source code of this file.

Data Structures

struct  LogicalDecodingCtlData
 

Typedefs

typedef struct LogicalDecodingCtlData LogicalDecodingCtlData
 

Functions

static void LogicalDecodingCtlShmemRequest (void *arg)
 
static void update_xlog_logical_info (void)
 
static void abort_logical_decoding_activation (int code, Datum arg)
 
static void write_logical_decoding_status_update_record (bool status)
 
void StartupLogicalDecodingStatus (bool last_status)
 
void InitializeProcessXLogLogicalInfo (void)
 
bool ProcessBarrierUpdateXLogLogicalInfo (void)
 
bool IsLogicalDecodingEnabled (void)
 
bool IsXLogLogicalInfoEnabled (void)
 
void AtEOXact_LogicalCtl (void)
 
void EnsureLogicalDecodingEnabled (void)
 
void EnableLogicalDecoding (void)
 
void RequestDisableLogicalDecoding (void)
 
void DisableLogicalDecodingIfNecessary (void)
 
void DisableLogicalDecoding (void)
 
void UpdateLogicalDecodingStatusEndOfRecovery (void)
 

Variables

static LogicalDecodingCtlDataLogicalDecodingCtl = NULL
 
const ShmemCallbacks LogicalDecodingCtlShmemCallbacks
 
bool XLogLogicalInfo = false
 
static bool XLogLogicalInfoUpdatePending = false
 

Typedef Documentation

◆ LogicalDecodingCtlData

Function Documentation

◆ abort_logical_decoding_activation()

static void abort_logical_decoding_activation ( int  code,
Datum  arg 
)
static

Definition at line 268 of file logicalctl.c.

269{
270 elog(DEBUG1, "aborting logical decoding activation process");
272}
#define DEBUG1
Definition elog.h:31
#define elog(elevel,...)
Definition elog.h:228
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:423

References DEBUG1, elog, and RequestDisableLogicalDecoding().

Referenced by EnsureLogicalDecodingEnabled().

◆ AtEOXact_LogicalCtl()

void AtEOXact_LogicalCtl ( void  )

Definition at line 233 of file logicalctl.c.

234{
235 /* Update the local cache if there is a pending update */
237 {
240 }
241}
static bool XLogLogicalInfoUpdatePending
Definition logicalctl.c:124
static void update_xlog_logical_info(void)
Definition logicalctl.c:164

References update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ DisableLogicalDecoding()

void DisableLogicalDecoding ( void  )

Definition at line 481 of file logicalctl.c.

482{
484 bool was_enabled;
485
487
488 /*
489 * Check if we can disable logical decoding.
490 *
491 * Nothing to do if both flags are already off, or if valid slots exist
492 * (skip the slot check during recovery because the existing slots will be
493 * invalidated after disabling logical decoding.)
494 */
498 {
501 return;
502 }
503
504 /*
505 * Remember if logical decoding was enabled. An interrupted activation can
506 * leave xlog_logical_info=true while logical_decoding_enabled remains
507 * false.
508 */
510
512
513 /*
514 * We need to disable logical decoding first and then disable logical
515 * information WAL logging in order to ensure that no logical decoding
516 * processes WAL records with insufficient information.
517 */
519
520 /* Write the WAL to disable logical decoding on standbys too */
521 if (!in_recovery && was_enabled)
523
524 /* Now disable logical information WAL logging */
527
529
530 /*
531 * Logging under the lock guarantees our "is disabled" message appears in
532 * the server log before its eventual "is enabled", making server log
533 * diagnostics easy.
534 */
535 if (!in_recovery && was_enabled)
536 ereport(LOG,
537 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
538
540
541 /*
542 * Tell all running processes to reflect the xlog_logical_info update.
543 * Unlike when enabling logical decoding, we don't need to wait for all
544 * processes to complete it in this case. We already disabled logical
545 * decoding and it's always safe to write logical information to WAL
546 * records, even when not strictly required. Therefore, we don't need to
547 * wait for all running transactions to finish either.
548 */
550}
#define LOG
Definition elog.h:32
#define ereport(elevel,...)
Definition elog.h:152
static LogicalDecodingCtlData * LogicalDecodingCtl
Definition logicalctl.c:100
static void write_logical_decoding_status_update_record(bool status)
Definition logicalctl.c:248
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define END_CRIT_SECTION()
Definition miscadmin.h:154
static char * errmsg
static int fb(int x)
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition procsignal.c:368
@ PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO
Definition procsignal.h:51
bool CheckLogicalSlotExists(void)
Definition slot.c:1619
bool RecoveryInProgress(void)
Definition xlog.c:6834

References CheckLogicalSlotExists(), EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg, fb(), LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by DisableLogicalDecodingIfNecessary(), and xlog_redo().

◆ DisableLogicalDecodingIfNecessary()

void DisableLogicalDecodingIfNecessary ( void  )

Definition at line 450 of file logicalctl.c.

451{
452 bool pending_disable;
453
455 return;
456
457 /*
458 * Sanity check as we cannot disable logical decoding while holding a
459 * logical slot.
460 */
462
463 if (RecoveryInProgress())
464 return;
465
467 pending_disable = LogicalDecodingCtl->pending_disable;
469
470 /* Quick return if no pending disable request */
471 if (!pending_disable)
472 return;
473
475}
#define Assert(condition)
Definition c.h:943
void DisableLogicalDecoding(void)
Definition logicalctl.c:481
@ LW_SHARED
Definition lwlock.h:105
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77

References Assert, DisableLogicalDecoding(), fb(), LogicalDecodingCtl, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyReplicationSlot, LogicalDecodingCtlData::pending_disable, RecoveryInProgress(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckpointerMain().

◆ EnableLogicalDecoding()

void EnableLogicalDecoding ( void  )

Definition at line 324 of file logicalctl.c.

325{
326 bool in_recovery;
327
329
330 /* Return if it is already enabled */
332 {
335 return;
336 }
337
338 /*
339 * Set logical info WAL logging in shmem. All process starts after this
340 * point will include the information required by logical decoding to WAL
341 * records.
342 */
344
346
347 /*
348 * Tell all running processes to reflect the xlog_logical_info update, and
349 * wait. This ensures that all running processes have enabled logical
350 * information WAL logging.
351 */
354
355 INJECTION_POINT("logical-decoding-activation", NULL);
356
358
359 /*
360 * There could be some transactions that might have started with the old
361 * status, but we don't need to wait for these transactions to complete as
362 * long as they have valid XIDs. These transactions will appear in the
363 * xl_running_xacts record and therefore the snapshot builder will not try
364 * to decode the transaction during the logical decoding initialization.
365 *
366 * There is a theoretical case where a transaction decides whether to
367 * include logical-info to WAL records before getting an XID. In this
368 * case, the transaction won't appear in xl_running_xacts.
369 *
370 * For operations that do not require an XID assignment, the process
371 * starts including logical-info immediately upon receiving the signal
372 * (barrier). If such an operation checks the effective_wal_level multiple
373 * times within a single execution, the resulting WAL records might be
374 * inconsistent (i.e., logical-info is included in some records but not in
375 * others). However, this is harmless because logical decoding generally
376 * ignores WAL records that are not associated with an assigned XID.
377 *
378 * One might think we need to wait for all running transactions, including
379 * those without XIDs and read-only transactions, to finish before
380 * enabling logical decoding. However, such a requirement would force the
381 * slot creation to wait for a potentially very long time due to
382 * long-running read queries, which is practically unacceptable.
383 */
384
386
388
389 /*
390 * We enable logical decoding first, followed by writing the WAL record.
391 * This sequence ensures logical decoding becomes available on the primary
392 * first.
393 */
395
396 if (!in_recovery)
398
400
402
404
405 /*
406 * We log the activation message after releasing the slot lock. This is
407 * safe because the activation is performed while holding a logical slot,
408 * meaning, a concurrent deactivation cannot interleave its log message
409 * ahead of ours.
410 */
411 if (!in_recovery)
412 ereport(LOG,
413 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
414}
#define INJECTION_POINT(name, arg)
void WaitForProcSignalBarrier(uint64 generation)
Definition procsignal.c:436

References EmitProcSignalBarrier(), END_CRIT_SECTION, ereport, errmsg, fb(), INJECTION_POINT, LOG, LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), START_CRIT_SECTION, WaitForProcSignalBarrier(), write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by EnsureLogicalDecodingEnabled(), and xlog_redo().

◆ EnsureLogicalDecodingEnabled()

void EnsureLogicalDecodingEnabled ( void  )

Definition at line 289 of file logicalctl.c.

290{
293
294 /* Logical decoding is always enabled */
296 return;
297
298 if (RecoveryInProgress())
299 {
300 /*
301 * CheckLogicalDecodingRequirements() must have already errored out if
302 * logical decoding is not enabled since we cannot enable the logical
303 * decoding status during recovery.
304 */
306 return;
307 }
308
309 /*
310 * Ensure to abort the activation process in cases where there in an
311 * interruption during the wait.
312 */
314 {
316 }
318}
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition ipc.h:52
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
static void abort_logical_decoding_activation(int code, Datum arg)
Definition logicalctl.c:268
void EnableLogicalDecoding(void)
Definition logicalctl.c:324
uint64_t Datum
Definition postgres.h:70
@ WAL_LEVEL_LOGICAL
Definition xlog.h:78

References abort_logical_decoding_activation(), Assert, EnableLogicalDecoding(), IsLogicalDecodingEnabled(), MyReplicationSlot, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, RecoveryInProgress(), wal_level, WAL_LEVEL_LOGICAL, and WAL_LEVEL_REPLICA.

Referenced by create_logical_replication_slot(), CreateReplicationSlot(), and repack_setup_logical_decoding().

◆ InitializeProcessXLogLogicalInfo()

void InitializeProcessXLogLogicalInfo ( void  )

Definition at line 174 of file logicalctl.c.

175{
177}

References update_xlog_logical_info().

Referenced by AuxiliaryProcessMainCommon(), and InitPostgres().

◆ IsLogicalDecodingEnabled()

◆ IsXLogLogicalInfoEnabled()

bool IsXLogLogicalInfoEnabled ( void  )

◆ LogicalDecodingCtlShmemRequest()

static void LogicalDecodingCtlShmemRequest ( void arg)
static

Definition at line 131 of file logicalctl.c.

132{
133 ShmemRequestStruct(.name = "Logical decoding control",
134 .size = sizeof(LogicalDecodingCtlData),
135 .ptr = (void **) &LogicalDecodingCtl,
136 );
137}
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References LogicalDecodingCtl, name, and ShmemRequestStruct.

◆ ProcessBarrierUpdateXLogLogicalInfo()

bool ProcessBarrierUpdateXLogLogicalInfo ( void  )

Definition at line 184 of file logicalctl.c.

185{
187 {
188 /* Delay updating XLogLogicalInfo until the transaction end */
190 }
191 else
193
194 return true;
195}
#define InvalidTransactionId
Definition transam.h:31
TransactionId GetTopTransactionIdIfAny(void)
Definition xact.c:443

References GetTopTransactionIdIfAny(), InvalidTransactionId, update_xlog_logical_info(), and XLogLogicalInfoUpdatePending.

Referenced by ProcessProcSignalBarrier().

◆ RequestDisableLogicalDecoding()

void RequestDisableLogicalDecoding ( void  )

Definition at line 423 of file logicalctl.c.

424{
426 return;
427
428 /*
429 * It's possible that we might not actually need to disable logical
430 * decoding if someone creates a new logical slot concurrently. We set the
431 * flag anyway and the checkpointer will check it and disable logical
432 * decoding if necessary.
433 */
437
439
440 elog(DEBUG1, "requested disabling logical decoding");
441}
void WakeupCheckpointer(void)

References DEBUG1, elog, fb(), LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, WakeupCheckpointer(), wal_level, and WAL_LEVEL_REPLICA.

Referenced by abort_logical_decoding_activation(), InvalidateObsoleteReplicationSlots(), ReplicationSlotCleanup(), ReplicationSlotDropAcquired(), and ReplicationSlotsDropDBSlots().

◆ StartupLogicalDecodingStatus()

void StartupLogicalDecodingStatus ( bool  last_status)

Definition at line 144 of file logicalctl.c.

145{
146 /* Logical decoding is always disabled when 'minimal' WAL level */
148 return;
149
150 /*
151 * Set the initial logical decoding status based on the last status. If
152 * logical decoding was enabled before the last shutdown, it remains
153 * enabled as we might have set wal_level='logical' or have at least one
154 * logical slot.
155 */
158}
@ WAL_LEVEL_MINIMAL
Definition xlog.h:76

References fb(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, wal_level, WAL_LEVEL_MINIMAL, and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().

◆ update_xlog_logical_info()

static void update_xlog_logical_info ( void  )
inlinestatic

Definition at line 164 of file logicalctl.c.

165{
167}
bool XLogLogicalInfo
Definition logicalctl.c:115
bool IsXLogLogicalInfoEnabled(void)
Definition logicalctl.c:218

References IsXLogLogicalInfoEnabled(), and XLogLogicalInfo.

Referenced by AtEOXact_LogicalCtl(), InitializeProcessXLogLogicalInfo(), and ProcessBarrierUpdateXLogLogicalInfo().

◆ UpdateLogicalDecodingStatusEndOfRecovery()

void UpdateLogicalDecodingStatusEndOfRecovery ( void  )

Definition at line 558 of file logicalctl.c.

559{
560 bool new_status = false;
561
563
564 /*
565 * With 'minimal' WAL level, there are no logical replication slots during
566 * recovery. Logical decoding is always disabled, so there is no need to
567 * synchronize XLogLogicalInfo.
568 */
570 {
572 return;
573 }
574
576
578 new_status = true;
579
580 /*
581 * When recovery ends, we need to either enable or disable logical
582 * decoding based on the wal_level setting and the presence of logical
583 * slots. We need to note that concurrent slot creation and deletion could
584 * happen but WAL writes are still not permitted until recovery fully
585 * completes. Here's how we handle concurrent toggling of logical
586 * decoding:
587 *
588 * For 'enable' case, if there's a concurrent disable request before
589 * recovery fully completes, the checkpointer will handle it after
590 * recovery is done. This means there might be a brief period after
591 * recovery where logical decoding remains enabled even with no logical
592 * replication slots present. This temporary state is not new - it can
593 * already occur due to the checkpointer's asynchronous deactivation
594 * process.
595 *
596 * For 'disable' case, backend cannot create logical replication slots
597 * during recovery (see checks in CheckLogicalDecodingRequirements()),
598 * which prevents a race condition between disabling logical decoding and
599 * concurrent slot creation.
600 */
602 {
603 /*
604 * Update both the logical decoding status and logical WAL logging
605 * status. Unlike toggling these status during non-recovery, we don't
606 * need to worry about the operation order as WAL writes are still not
607 * permitted.
608 */
611
612 elog(DEBUG1,
613 "update logical decoding status to %d at the end of recovery",
614 new_status);
615
616 /*
617 * Now that we updated the logical decoding status, clear the pending
618 * disable flag. It's possible that a concurrent process drops the
619 * last logical slot and initiates the pending disable again. The
620 * checkpointer process will check it.
621 */
623
625
627 }
628 else
630
631 /*
632 * Ensure all running processes have the updated status. We don't need to
633 * wait for running transactions to finish as we don't accept any writes
634 * yet. On the other hand, we need to wait for synchronizing
635 * XLogLogicalInfo even if we've not updated the status above as the
636 * status have been turned on and off during recovery, having running
637 * processes have different status on their local caches.
638 */
642
643 INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
644}
bool IsUnderPostmaster
Definition globals.c:122

References Assert, CheckLogicalSlotExists(), DEBUG1, elog, EmitProcSignalBarrier(), fb(), INJECTION_POINT, IsLogicalDecodingEnabled(), IsUnderPostmaster, IsXLogLogicalInfoEnabled(), LogicalDecodingCtlData::logical_decoding_enabled, LogicalDecodingCtl, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LogicalDecodingCtlData::pending_disable, PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, RecoveryInProgress(), WaitForProcSignalBarrier(), wal_level, WAL_LEVEL_LOGICAL, WAL_LEVEL_MINIMAL, write_logical_decoding_status_update_record(), and LogicalDecodingCtlData::xlog_logical_info.

Referenced by StartupXLOG().

◆ write_logical_decoding_status_update_record()

static void write_logical_decoding_status_update_record ( bool  status)
static

Definition at line 248 of file logicalctl.c.

249{
251
253 XLogRegisterData(&status, sizeof(bool));
256}
#define XLOG_LOGICAL_DECODING_STATUS_CHANGE
Definition pg_control.h:87
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
uint64 XLogRecPtr
Definition xlogdefs.h:21
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153

References fb(), XLOG_LOGICAL_DECODING_STATUS_CHANGE, XLogBeginInsert(), XLogFlush(), XLogInsert(), and XLogRegisterData().

Referenced by DisableLogicalDecoding(), EnableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

Variable Documentation

◆ LogicalDecodingCtl

◆ LogicalDecodingCtlShmemCallbacks

const ShmemCallbacks LogicalDecodingCtlShmemCallbacks
Initial value:
= {
}
static void LogicalDecodingCtlShmemRequest(void *arg)
Definition logicalctl.c:131

Definition at line 104 of file logicalctl.c.

104 {
105 .request_fn = LogicalDecodingCtlShmemRequest,
106};

◆ XLogLogicalInfo

bool XLogLogicalInfo = false

Definition at line 115 of file logicalctl.c.

Referenced by update_xlog_logical_info().

◆ XLogLogicalInfoUpdatePending

bool XLogLogicalInfoUpdatePending = false
static

Definition at line 124 of file logicalctl.c.

Referenced by AtEOXact_LogicalCtl(), and ProcessBarrierUpdateXLogLogicalInfo().