PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
slot.h File Reference
Include dependency graph for slot.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotPersistentData
 
struct  ReplicationSlot
 
struct  ReplicationSlotCtlData
 

Macros

#define PG_REPLSLOT_DIR   "pg_replslot"
 
#define RS_INVAL_MAX_CAUSES   4
 
#define SlotIsPhysical(slot)   ((slot)->data.database == InvalidOid)
 
#define SlotIsLogical(slot)   ((slot)->data.database != InvalidOid)
 

Typedefs

typedef enum ReplicationSlotPersistency ReplicationSlotPersistency
 
typedef enum ReplicationSlotInvalidationCause ReplicationSlotInvalidationCause
 
typedef struct ReplicationSlotPersistentData ReplicationSlotPersistentData
 
typedef struct ReplicationSlot ReplicationSlot
 
typedef struct ReplicationSlotCtlData ReplicationSlotCtlData
 

Enumerations

enum  ReplicationSlotPersistency { RS_PERSISTENT , RS_EPHEMERAL , RS_TEMPORARY }
 
enum  ReplicationSlotInvalidationCause {
  RS_INVAL_NONE = 0 , RS_INVAL_WAL_REMOVED = (1 << 0) , RS_INVAL_HORIZON = (1 << 1) , RS_INVAL_WAL_LEVEL = (1 << 2) ,
  RS_INVAL_IDLE_TIMEOUT = (1 << 3)
}
 

Functions

static void ReplicationSlotSetInactiveSince (ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, int elevel)
 
void ReplicationSlotReserveWal (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
void StartupReplicationSlots (void)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const char * GetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

PGDLLIMPORT ReplicationSlotCtlDataReplicationSlotCtl
 
PGDLLIMPORT ReplicationSlotMyReplicationSlot
 
PGDLLIMPORT int max_replication_slots
 
PGDLLIMPORT char * synchronized_standby_slots
 
PGDLLIMPORT int idle_replication_slot_timeout_mins
 

Macro Definition Documentation

◆ PG_REPLSLOT_DIR

#define PG_REPLSLOT_DIR   "pg_replslot"

Definition at line 21 of file slot.h.

◆ RS_INVAL_MAX_CAUSES

#define RS_INVAL_MAX_CAUSES   4

Definition at line 65 of file slot.h.

◆ SlotIsLogical

#define SlotIsLogical (   slot)    ((slot)->data.database != InvalidOid)

Definition at line 221 of file slot.h.

◆ SlotIsPhysical

#define SlotIsPhysical (   slot)    ((slot)->data.database == InvalidOid)

Definition at line 220 of file slot.h.

Typedef Documentation

◆ ReplicationSlot

◆ ReplicationSlotCtlData

◆ ReplicationSlotInvalidationCause

◆ ReplicationSlotPersistency

◆ ReplicationSlotPersistentData

Enumeration Type Documentation

◆ ReplicationSlotInvalidationCause

Enumerator
RS_INVAL_NONE 
RS_INVAL_WAL_REMOVED 
RS_INVAL_HORIZON 
RS_INVAL_WAL_LEVEL 
RS_INVAL_IDLE_TIMEOUT 

Definition at line 51 of file slot.h.

52{
53 RS_INVAL_NONE = 0,
54 /* required WAL has been removed */
55 RS_INVAL_WAL_REMOVED = (1 << 0),
56 /* required rows have been removed */
57 RS_INVAL_HORIZON = (1 << 1),
58 /* wal_level insufficient for slot */
59 RS_INVAL_WAL_LEVEL = (1 << 2),
60 /* idle slot timeout has occurred */
61 RS_INVAL_IDLE_TIMEOUT = (1 << 3),
ReplicationSlotInvalidationCause
Definition: slot.h:52
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:55
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:61
@ RS_INVAL_HORIZON
Definition: slot.h:57
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:59
@ RS_INVAL_NONE
Definition: slot.h:53

◆ ReplicationSlotPersistency

Enumerator
RS_PERSISTENT 
RS_EPHEMERAL 
RS_TEMPORARY 

Definition at line 36 of file slot.h.

37{
ReplicationSlotPersistency
Definition: slot.h:37
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40

Function Documentation

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2032 of file slot.c.

2033{
2034 int i;
2035
2036 elog(DEBUG1, "performing replication slot checkpoint");
2037
2038 /*
2039 * Prevent any slot from being created/dropped while we're active. As we
2040 * explicitly do *not* want to block iterating over replication_slots or
2041 * acquiring a slot we cannot take the control lock - but that's OK,
2042 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2043 * enough to guarantee that nobody can change the in_use bits on us.
2044 */
2045 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2046
2047 for (i = 0; i < max_replication_slots; i++)
2048 {
2050 char path[MAXPGPATH];
2051
2052 if (!s->in_use)
2053 continue;
2054
2055 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2056 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2057
2058 /*
2059 * Slot's data is not flushed each time the confirmed_flush LSN is
2060 * updated as that could lead to frequent writes. However, we decide
2061 * to force a flush of all logical slot's data at the time of shutdown
2062 * if the confirmed_flush LSN is changed since we last flushed it to
2063 * disk. This helps in avoiding an unnecessary retreat of the
2064 * confirmed_flush LSN after restart.
2065 */
2066 if (is_shutdown && SlotIsLogical(s))
2067 {
2069
2070 if (s->data.invalidated == RS_INVAL_NONE &&
2072 {
2073 s->just_dirtied = true;
2074 s->dirty = true;
2075 }
2077 }
2078
2079 SaveSlotToPath(s, path, LOG);
2080 }
2081 LWLockRelease(ReplicationSlotAllocationLock);
2082}
#define NameStr(name)
Definition: c.h:717
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
#define MAXPGPATH
#define sprintf
Definition: port.h:241
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2211
int max_replication_slots
Definition: slot.c:150
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:144
#define PG_REPLSLOT_DIR
Definition: slot.h:21
#define SlotIsLogical(slot)
Definition: slot.h:221
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
ReplicationSlot replication_slots[1]
Definition: slot.h:232
XLogRecPtr confirmed_flush
Definition: slot.h:111
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:103
slock_t mutex
Definition: slot.h:158
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:210
bool in_use
Definition: slot.h:161
bool just_dirtied
Definition: slot.h:167
bool dirty
Definition: slot.h:168
ReplicationSlotPersistentData data
Definition: slot.h:185

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1435 of file slot.c.

1436{
1438 ereport(ERROR,
1439 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1440 errmsg("permission denied to use replication slots"),
1441 errdetail("Only roles with the %s attribute may use replication slots.",
1442 "REPLICATION")));
1443}
int errdetail(const char *fmt,...)
Definition: elog.c:1204
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
Oid GetUserId(void)
Definition: miscinit.c:520
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:739

References ereport, errcode(), errdetail(), errmsg(), ERROR, GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )

Definition at line 1413 of file slot.c.

1414{
1415 /*
1416 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1417 * needs the same check.
1418 */
1419
1420 if (max_replication_slots == 0)
1421 ereport(ERROR,
1422 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1423 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1424
1426 ereport(ERROR,
1427 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1429}
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char *  cause_name)

Definition at line 2607 of file slot.c.

2608{
2609 Assert(cause_name);
2610
2611 /* Search lookup table for the cause having this name */
2612 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2613 {
2614 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2616 }
2617
2618 Assert(false);
2619 return RS_INVAL_NONE; /* to keep compiler quiet */
2620}
Assert(PointerIsAligned(start, uint64))
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:112
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:65
ReplicationSlotInvalidationCause cause
Definition: slot.c:108

References Assert(), SlotInvalidationCauseMap::cause, i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by synchronize_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2627 of file slot.c.

2628{
2629 /* Search lookup table for the name of this cause */
2630 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2631 {
2632 if (SlotInvalidationCauses[i].cause == cause)
2634 }
2635
2636 Assert(false);
2637 return "none"; /* to keep compiler quiet */
2638}
const char * cause_name
Definition: slot.c:109

References Assert(), SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 1976 of file slot.c.

1979{
1980 XLogRecPtr oldestLSN;
1981 bool invalidated = false;
1982
1983 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
1984 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
1985 Assert(possible_causes != RS_INVAL_NONE);
1986
1987 if (max_replication_slots == 0)
1988 return invalidated;
1989
1990 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1991
1992restart:
1993 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1994 for (int i = 0; i < max_replication_slots; i++)
1995 {
1997
1998 if (!s->in_use)
1999 continue;
2000
2001 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2002 snapshotConflictHorizon,
2003 &invalidated))
2004 {
2005 /* if the lock was released, start from scratch */
2006 goto restart;
2007 }
2008 }
2009 LWLockRelease(ReplicationSlotControlLock);
2010
2011 /*
2012 * If any slots have been invalidated, recalculate the resource limits.
2013 */
2014 if (invalidated)
2015 {
2018 }
2019
2020 return invalidated;
2021}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1100
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1156
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1718
#define TransactionIdIsValid(xid)
Definition: transam.h:41
int wal_segment_size
Definition: xlog.c:143
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), i, ReplicationSlot::in_use, InvalidatePossiblyObsoleteSlot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char *  name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 559 of file slot.c.

560{
562 int active_pid;
563
564 Assert(name != NULL);
565
566retry:
567 Assert(MyReplicationSlot == NULL);
568
569 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
570
571 /* Check if the slot exits with the given name. */
573 if (s == NULL || !s->in_use)
574 {
575 LWLockRelease(ReplicationSlotControlLock);
576
578 (errcode(ERRCODE_UNDEFINED_OBJECT),
579 errmsg("replication slot \"%s\" does not exist",
580 name)));
581 }
582
583 /*
584 * This is the slot we want; check if it's active under some other
585 * process. In single user mode, we don't need this check.
586 */
588 {
589 /*
590 * Get ready to sleep on the slot in case it is active. (We may end
591 * up not sleeping, but we don't want to do this while holding the
592 * spinlock.)
593 */
594 if (!nowait)
596
597 /*
598 * It is important to reset the inactive_since under spinlock here to
599 * avoid race conditions with slot invalidation. See comments related
600 * to inactive_since in InvalidatePossiblyObsoleteSlot.
601 */
603 if (s->active_pid == 0)
605 active_pid = s->active_pid;
608 }
609 else
610 {
611 active_pid = MyProcPid;
613 }
614 LWLockRelease(ReplicationSlotControlLock);
615
616 /*
617 * If we found the slot but it's already active in another process, we
618 * wait until the owning process signals us that it's been released, or
619 * error out.
620 */
621 if (active_pid != MyProcPid)
622 {
623 if (!nowait)
624 {
625 /* Wait here until we get signaled, and then restart */
627 WAIT_EVENT_REPLICATION_SLOT_DROP);
629 goto retry;
630 }
631
633 (errcode(ERRCODE_OBJECT_IN_USE),
634 errmsg("replication slot \"%s\" is active for PID %d",
635 NameStr(s->data.name), active_pid)));
636 }
637 else if (!nowait)
638 ConditionVariableCancelSleep(); /* no sleep needed after all */
639
640 /* We made this slot active, so it's ours now. */
642
643 /*
644 * We need to check for invalidation after making the slot ours to avoid
645 * the possible race condition with the checkpointer that can otherwise
646 * invalidate the slot immediately after the check.
647 */
648 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
650 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
651 errmsg("can no longer access replication slot \"%s\"",
652 NameStr(s->data.name)),
653 errdetail("This replication slot has been invalidated due to \"%s\".",
655
656 /* Let everybody know we've modified this slot */
658
659 /*
660 * The call to pgstat_acquire_replslot() protects against stats for a
661 * different slot, from before a restart or such, being present during
662 * pgstat_report_replslot().
663 */
664 if (SlotIsLogical(s))
666
667
668 if (am_walsender)
669 {
672 ? errmsg("acquired logical replication slot \"%s\"",
673 NameStr(s->data.name))
674 : errmsg("acquired physical replication slot \"%s\"",
675 NameStr(s->data.name)));
676 }
677}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int MyProcPid
Definition: globals.c:48
bool IsUnderPostmaster
Definition: globals.c:121
void pgstat_acquire_replslot(ReplicationSlot *slot)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:479
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2627
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:239
pid_t active_pid
Definition: slot.h:164
ConditionVariable active_cv
Definition: slot.h:191
const char * name
bool am_walsender
Definition: walsender.c:120
bool log_replication_commands
Definition: walsender.c:130

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg(), ERROR, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by binary_upgrade_logical_slot_has_caught_up(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char *  name,
const bool *  failover,
const bool *  two_phase 
)

Definition at line 837 of file slot.c.

839{
840 bool update_slot = false;
841
842 Assert(MyReplicationSlot == NULL);
844
845 ReplicationSlotAcquire(name, false, true);
846
849 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
850 errmsg("cannot use %s with a physical replication slot",
851 "ALTER_REPLICATION_SLOT"));
852
853 if (RecoveryInProgress())
854 {
855 /*
856 * Do not allow users to alter the slots which are currently being
857 * synced from the primary to the standby.
858 */
861 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
862 errmsg("cannot alter replication slot \"%s\"", name),
863 errdetail("This replication slot is being synchronized from the primary server."));
864
865 /*
866 * Do not allow users to enable failover on the standby as we do not
867 * support sync to the cascading standby.
868 */
869 if (failover && *failover)
871 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 errmsg("cannot enable failover for a replication slot"
873 " on the standby"));
874 }
875
876 if (failover)
877 {
878 /*
879 * Do not allow users to enable failover for temporary slots as we do
880 * not support syncing temporary slots to the standby.
881 */
884 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
885 errmsg("cannot enable failover for a temporary replication slot"));
886
888 {
892
893 update_slot = true;
894 }
895 }
896
898 {
902
903 update_slot = true;
904 }
905
906 if (update_slot)
907 {
910 }
911
913}
static bool two_phase
static bool failover
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:559
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1061
void ReplicationSlotSave(void)
Definition: slot.c:1043
void ReplicationSlotRelease(void)
Definition: slot.c:686
#define SlotIsPhysical(slot)
Definition: slot.h:220
ReplicationSlotPersistency persistency
Definition: slot.h:81
bool RecoveryInProgress(void)
Definition: xlog.c:6522

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 775 of file slot.c.

776{
777 int i;
778
779 Assert(MyReplicationSlot == NULL);
780
781restart:
782 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
783 for (i = 0; i < max_replication_slots; i++)
784 {
786
787 if (!s->in_use)
788 continue;
789
791 if ((s->active_pid == MyProcPid &&
792 (!synced_only || s->data.synced)))
793 {
796 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
797
799
801 goto restart;
802 }
803 else
805 }
806
807 LWLockRelease(ReplicationSlotControlLock);
808}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:936

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RS_TEMPORARY, SpinLockAcquire, SpinLockRelease, and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char *  name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)

Definition at line 324 of file slot.c.

327{
328 ReplicationSlot *slot = NULL;
329 int i;
330
331 Assert(MyReplicationSlot == NULL);
332
334
335 if (failover)
336 {
337 /*
338 * Do not allow users to create the failover enabled slots on the
339 * standby as we do not support sync to the cascading standby.
340 *
341 * However, failover enabled slots can be created during slot
342 * synchronization because we need to retain the same values as the
343 * remote slot.
344 */
347 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348 errmsg("cannot enable failover for a replication slot created on the standby"));
349
350 /*
351 * Do not allow users to create failover enabled temporary slots,
352 * because temporary slots will not be synced to the standby.
353 *
354 * However, failover enabled temporary slots can be created during
355 * slot synchronization. See the comments atop slotsync.c for details.
356 */
357 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
359 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
360 errmsg("cannot enable failover for a temporary replication slot"));
361 }
362
363 /*
364 * If some other backend ran this code concurrently with us, we'd likely
365 * both allocate the same slot, and that would be bad. We'd also be at
366 * risk of missing a name collision. Also, we don't want to try to create
367 * a new slot while somebody's busy cleaning up an old one, because we
368 * might both be monkeying with the same directory.
369 */
370 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
371
372 /*
373 * Check for name collision, and identify an allocatable slot. We need to
374 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
375 * else can change the in_use flags while we're looking at them.
376 */
377 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
378 for (i = 0; i < max_replication_slots; i++)
379 {
381
382 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
385 errmsg("replication slot \"%s\" already exists", name)));
386 if (!s->in_use && slot == NULL)
387 slot = s;
388 }
389 LWLockRelease(ReplicationSlotControlLock);
390
391 /* If all slots are in use, we're out of luck. */
392 if (slot == NULL)
394 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
395 errmsg("all replication slots are in use"),
396 errhint("Free one or increase \"max_replication_slots\".")));
397
398 /*
399 * Since this slot is not in use, nobody should be looking at any part of
400 * it other than the in_use field unless they're trying to allocate it.
401 * And since we hold ReplicationSlotAllocationLock, nobody except us can
402 * be doing that. So it's safe to initialize the slot.
403 */
404 Assert(!slot->in_use);
405 Assert(slot->active_pid == 0);
406
407 /* first initialize persistent data */
408 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
409 namestrcpy(&slot->data.name, name);
410 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
411 slot->data.persistency = persistency;
412 slot->data.two_phase = two_phase;
414 slot->data.failover = failover;
415 slot->data.synced = synced;
416
417 /* and then data only present in shared memory */
418 slot->just_dirtied = false;
419 slot->dirty = false;
427 slot->inactive_since = 0;
428
429 /*
430 * Create the slot on disk. We haven't actually marked the slot allocated
431 * yet, so no special cleanup is required if this errors out.
432 */
433 CreateSlotOnDisk(slot);
434
435 /*
436 * We need to briefly prevent any other backend from iterating over the
437 * slots while we flip the in_use flag. We also need to set the active
438 * flag while holding the ControlLock as otherwise a concurrent
439 * ReplicationSlotAcquire() could acquire the slot as well.
440 */
441 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
442
443 slot->in_use = true;
444
445 /* We can now mark the slot active, and that makes it our slot. */
446 SpinLockAcquire(&slot->mutex);
447 Assert(slot->active_pid == 0);
448 slot->active_pid = MyProcPid;
449 SpinLockRelease(&slot->mutex);
450 MyReplicationSlot = slot;
451
452 LWLockRelease(ReplicationSlotControlLock);
453
454 /*
455 * Create statistics entry for the new logical slot. We don't collect any
456 * stats for physical slots, so no need to create an entry for the same.
457 * See ReplicationSlotDropPtr for why we need to do this before releasing
458 * ReplicationSlotAllocationLock.
459 */
460 if (SlotIsLogical(slot))
462
463 /*
464 * Now that the slot has been marked as in_use and active, it's safe to
465 * let somebody else try to allocate a slot.
466 */
467 LWLockRelease(ReplicationSlotAllocationLock);
468
469 /* Let everybody know we've modified this slot */
471}
int errhint(const char *fmt,...)
Definition: elog.c:1318
Oid MyDatabaseId
Definition: globals.c:95
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
#define InvalidOid
Definition: postgres_ext.h:35
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2150
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:267
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1653
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:201
TransactionId effective_catalog_xmin
Definition: slot.h:182
XLogRecPtr candidate_restart_valid
Definition: slot.h:202
TransactionId effective_xmin
Definition: slot.h:181
XLogRecPtr candidate_restart_lsn
Definition: slot.h:203
TransactionId candidate_catalog_xmin
Definition: slot.h:200
TimestampTz inactive_since
Definition: slot.h:217
#define InvalidTransactionId
Definition: transam.h:31
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char *  name,
bool  nowait 
)

Definition at line 814 of file slot.c.

815{
816 Assert(MyReplicationSlot == NULL);
817
818 ReplicationSlotAcquire(name, nowait, false);
819
820 /*
821 * Do not allow users to drop the slots which are currently being synced
822 * from the primary to the standby.
823 */
826 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
827 errmsg("cannot drop replication slot \"%s\"", name),
828 errdetail("This replication slot is being synchronized from the primary server."));
829
831}
void ReplicationSlotDropAcquired(void)
Definition: slot.c:919

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )

Definition at line 919 of file slot.c.

920{
922
923 Assert(MyReplicationSlot != NULL);
924
925 /* slot isn't acquired anymore */
926 MyReplicationSlot = NULL;
927
929}

References Assert(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char *  slotname,
bool  missing_ok 
)

Definition at line 1913 of file subscriptioncmds.c.

1914{
1915 StringInfoData cmd;
1916
1917 Assert(wrconn);
1918
1919 load_file("libpqwalreceiver", false);
1920
1921 initStringInfo(&cmd);
1922 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1923
1924 PG_TRY();
1925 {
1926 WalRcvExecResult *res;
1927
1928 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1929
1930 if (res->status == WALRCV_OK_COMMAND)
1931 {
1932 /* NOTICE. Success. */
1934 (errmsg("dropped replication slot \"%s\" on publisher",
1935 slotname)));
1936 }
1937 else if (res->status == WALRCV_ERROR &&
1938 missing_ok &&
1939 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1940 {
1941 /* LOG. Error, but missing_ok = true. */
1942 ereport(LOG,
1943 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
1944 slotname, res->err)));
1945 }
1946 else
1947 {
1948 /* ERROR. */
1949 ereport(ERROR,
1950 (errcode(ERRCODE_CONNECTION_FAILURE),
1951 errmsg("could not drop replication slot \"%s\" on publisher: %s",
1952 slotname, res->err)));
1953 }
1954
1956 }
1957 PG_FINALLY();
1958 {
1959 pfree(cmd.data);
1960 }
1961 PG_END_TRY();
1962}
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:134
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:389
void pfree(void *pointer)
Definition: mcxt.c:2147
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13019
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
WalRcvExecStatus status
Definition: walreceiver.h:220
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465

References appendStringInfo(), Assert(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, quote_identifier(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and process_syncing_tables_for_sync().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 239 of file slot.c.

240{
242}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:248

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 528 of file slot.c.

529{
530 ReplicationSlot *slot;
531 bool found;
532
534
535 /*
536 * Ensure that the slot cannot be dropped while we copy the name. Don't
537 * need the spinlock as the name of an existing slot cannot change.
538 */
539 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
540 found = slot->in_use;
541 if (slot->in_use)
543 LWLockRelease(ReplicationSlotControlLock);
544
545 return found;
546}
Definition: type.h:96

References ReplicationSlot::data, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1273 of file tablesync.c.

1275{
1276 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1277 relid, GetSystemIdentifier());
1278}
#define UINT64_FORMAT
Definition: c.h:521
#define snprintf
Definition: port.h:239
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4734

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 686 of file slot.c.

687{
689 char *slotname = NULL; /* keep compiler quiet */
690 bool is_logical = false; /* keep compiler quiet */
691 TimestampTz now = 0;
692
693 Assert(slot != NULL && slot->active_pid != 0);
694
695 if (am_walsender)
696 {
697 slotname = pstrdup(NameStr(slot->data.name));
698 is_logical = SlotIsLogical(slot);
699 }
700
701 if (slot->data.persistency == RS_EPHEMERAL)
702 {
703 /*
704 * Delete the slot. There is no !PANIC case where this is allowed to
705 * fail, all that may happen is an incomplete cleanup of the on-disk
706 * data.
707 */
709 }
710
711 /*
712 * If slot needed to temporarily restrain both data and catalog xmin to
713 * create the catalog snapshot, remove that temporary constraint.
714 * Snapshots can only be exported while the initial snapshot is still
715 * acquired.
716 */
717 if (!TransactionIdIsValid(slot->data.xmin) &&
719 {
720 SpinLockAcquire(&slot->mutex);
722 SpinLockRelease(&slot->mutex);
724 }
725
726 /*
727 * Set the time since the slot has become inactive. We get the current
728 * time beforehand to avoid system call while holding the spinlock.
729 */
731
732 if (slot->data.persistency == RS_PERSISTENT)
733 {
734 /*
735 * Mark persistent slot inactive. We're not freeing it, just
736 * disconnecting, but wake up others that may be waiting for it.
737 */
738 SpinLockAcquire(&slot->mutex);
739 slot->active_pid = 0;
741 SpinLockRelease(&slot->mutex);
743 }
744 else
746
747 MyReplicationSlot = NULL;
748
749 /* might not have been set when we've been a plain slot */
750 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
751 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
753 LWLockRelease(ProcArrayLock);
754
755 if (am_walsender)
756 {
758 is_logical
759 ? errmsg("released logical replication slot \"%s\"",
760 slotname)
761 : errmsg("released physical replication slot \"%s\"",
762 slotname));
763
764 pfree(slotname);
765 }
766}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:2322
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
uint8 statusFlags
Definition: proc.h:243
int pgxactoff
Definition: proc.h:185
uint8 * statusFlags
Definition: proc.h:387
TransactionId xmin
Definition: slot.h:89

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg(), GetCurrentTimestamp(), InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1452 of file slot.c.

1453{
1455
1456 Assert(slot != NULL);
1458
1459 /*
1460 * The replication slot mechanism is used to prevent removal of required
1461 * WAL. As there is no interlock between this routine and checkpoints, WAL
1462 * segments could concurrently be removed when a now stale return value of
1463 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1464 * this happens we'll just retry.
1465 */
1466 while (true)
1467 {
1468 XLogSegNo segno;
1469 XLogRecPtr restart_lsn;
1470
1471 /*
1472 * For logical slots log a standby snapshot and start logical decoding
1473 * at exactly that position. That allows the slot to start up more
1474 * quickly. But on a standby we cannot do WAL writes, so just use the
1475 * replay pointer; effectively, an attempt to create a logical slot on
1476 * standby will cause it to wait for an xl_running_xact record to be
1477 * logged independently on the primary, so that a snapshot can be
1478 * built using the record.
1479 *
1480 * None of this is needed (or indeed helpful) for physical slots as
1481 * they'll start replay at the last logged checkpoint anyway. Instead
1482 * return the location of the last redo LSN. While that slightly
1483 * increases the chance that we have to retry, it's where a base
1484 * backup has to start replay at.
1485 */
1486 if (SlotIsPhysical(slot))
1487 restart_lsn = GetRedoRecPtr();
1488 else if (RecoveryInProgress())
1489 restart_lsn = GetXLogReplayRecPtr(NULL);
1490 else
1491 restart_lsn = GetXLogInsertRecPtr();
1492
1493 SpinLockAcquire(&slot->mutex);
1494 slot->data.restart_lsn = restart_lsn;
1495 SpinLockRelease(&slot->mutex);
1496
1497 /* prevent WAL removal as fast as possible */
1499
1500 /*
1501 * If all required WAL is still there, great, otherwise retry. The
1502 * slot should prevent further removal of WAL, unless there's a
1503 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1504 * the new restart_lsn above, so normally we should never need to loop
1505 * more than twice.
1506 */
1508 if (XLogGetLastRemovedSegno() < segno)
1509 break;
1510 }
1511
1512 if (!RecoveryInProgress() && SlotIsLogical(slot))
1513 {
1514 XLogRecPtr flushptr;
1515
1516 /* make sure we have enough information to start */
1517 flushptr = LogStandbySnapshot();
1518
1519 /* and make sure it's fsynced to disk */
1520 XLogFlush(flushptr);
1521 }
1522}
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3897
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6625
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9612
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2923
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert(), ReplicationSlot::data, GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, LogStandbySnapshot(), ReplicationSlot::mutex, MyReplicationSlot, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, wal_segment_size, XLByteToSeg, XLogFlush(), and XLogGetLastRemovedSegno().

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1205 of file slot.c.

1206{
1208 int i;
1209
1210 if (max_replication_slots <= 0)
1211 return InvalidXLogRecPtr;
1212
1213 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1214
1215 for (i = 0; i < max_replication_slots; i++)
1216 {
1217 ReplicationSlot *s;
1218 XLogRecPtr restart_lsn;
1219 bool invalidated;
1220
1222
1223 /* cannot change while ReplicationSlotCtlLock is held */
1224 if (!s->in_use)
1225 continue;
1226
1227 /* we're only interested in logical slots */
1228 if (!SlotIsLogical(s))
1229 continue;
1230
1231 /* read once, it's ok if it increases while we're checking */
1233 restart_lsn = s->data.restart_lsn;
1234 invalidated = s->data.invalidated != RS_INVAL_NONE;
1236
1237 /* invalidated slots need not apply */
1238 if (invalidated)
1239 continue;
1240
1241 if (restart_lsn == InvalidXLogRecPtr)
1242 continue;
1243
1244 if (result == InvalidXLogRecPtr ||
1245 restart_lsn < result)
1246 result = restart_lsn;
1247 }
1248
1249 LWLockRelease(ReplicationSlotControlLock);
1250
1251 return result;
1252}

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1156 of file slot.c.

1157{
1158 int i;
1159 XLogRecPtr min_required = InvalidXLogRecPtr;
1160
1161 Assert(ReplicationSlotCtl != NULL);
1162
1163 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1164 for (i = 0; i < max_replication_slots; i++)
1165 {
1167 XLogRecPtr restart_lsn;
1168 bool invalidated;
1169
1170 if (!s->in_use)
1171 continue;
1172
1174 restart_lsn = s->data.restart_lsn;
1175 invalidated = s->data.invalidated != RS_INVAL_NONE;
1177
1178 /* invalidated slots need not apply */
1179 if (invalidated)
1180 continue;
1181
1182 if (restart_lsn != InvalidXLogRecPtr &&
1183 (min_required == InvalidXLogRecPtr ||
1184 restart_lsn < min_required))
1185 min_required = restart_lsn;
1186 }
1187 LWLockRelease(ReplicationSlotControlLock);
1188
1190}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2809

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, and XLogSetReplicationSlotMinimumLSN().

Referenced by copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1100 of file slot.c.

1101{
1102 int i;
1104 TransactionId agg_catalog_xmin = InvalidTransactionId;
1105
1106 Assert(ReplicationSlotCtl != NULL);
1107
1108 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1109
1110 for (i = 0; i < max_replication_slots; i++)
1111 {
1113 TransactionId effective_xmin;
1114 TransactionId effective_catalog_xmin;
1115 bool invalidated;
1116
1117 if (!s->in_use)
1118 continue;
1119
1121 effective_xmin = s->effective_xmin;
1122 effective_catalog_xmin = s->effective_catalog_xmin;
1123 invalidated = s->data.invalidated != RS_INVAL_NONE;
1125
1126 /* invalidated slots need not apply */
1127 if (invalidated)
1128 continue;
1129
1130 /* check the data xmin */
1131 if (TransactionIdIsValid(effective_xmin) &&
1132 (!TransactionIdIsValid(agg_xmin) ||
1133 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1134 agg_xmin = effective_xmin;
1135
1136 /* check the catalog xmin */
1137 if (TransactionIdIsValid(effective_catalog_xmin) &&
1138 (!TransactionIdIsValid(agg_catalog_xmin) ||
1139 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1140 agg_catalog_xmin = effective_catalog_xmin;
1141 }
1142
1143 LWLockRelease(ReplicationSlotControlLock);
1144
1145 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1146}
uint32 TransactionId
Definition: c.h:623
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3943
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int *  nslots,
int *  nactive 
)

Definition at line 1263 of file slot.c.

1264{
1265 int i;
1266
1267 *nslots = *nactive = 0;
1268
1269 if (max_replication_slots <= 0)
1270 return false;
1271
1272 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1273 for (i = 0; i < max_replication_slots; i++)
1274 {
1275 ReplicationSlot *s;
1276
1278
1279 /* cannot change while ReplicationSlotCtlLock is held */
1280 if (!s->in_use)
1281 continue;
1282
1283 /* only logical slots are database specific, skip */
1284 if (!SlotIsLogical(s))
1285 continue;
1286
1287 /* not our database, skip */
1288 if (s->data.database != dboid)
1289 continue;
1290
1291 /* NB: intentionally counting invalidated slots */
1292
1293 /* count slots with spinlock held */
1295 (*nslots)++;
1296 if (s->active_pid != 0)
1297 (*nactive)++;
1299 }
1300 LWLockRelease(ReplicationSlotControlLock);
1301
1302 if (*nslots > 0)
1303 return true;
1304 return false;
1305}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1321 of file slot.c.

1322{
1323 int i;
1324
1325 if (max_replication_slots <= 0)
1326 return;
1327
1328restart:
1329 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1330 for (i = 0; i < max_replication_slots; i++)
1331 {
1332 ReplicationSlot *s;
1333 char *slotname;
1334 int active_pid;
1335
1337
1338 /* cannot change while ReplicationSlotCtlLock is held */
1339 if (!s->in_use)
1340 continue;
1341
1342 /* only logical slots are database specific, skip */
1343 if (!SlotIsLogical(s))
1344 continue;
1345
1346 /* not our database, skip */
1347 if (s->data.database != dboid)
1348 continue;
1349
1350 /* NB: intentionally including invalidated slots */
1351
1352 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1354 /* can't change while ReplicationSlotControlLock is held */
1355 slotname = NameStr(s->data.name);
1356 active_pid = s->active_pid;
1357 if (active_pid == 0)
1358 {
1360 s->active_pid = MyProcPid;
1361 }
1363
1364 /*
1365 * Even though we hold an exclusive lock on the database object a
1366 * logical slot for that DB can still be active, e.g. if it's
1367 * concurrently being dropped by a backend connected to another DB.
1368 *
1369 * That's fairly unlikely in practice, so we'll just bail out.
1370 *
1371 * The slot sync worker holds a shared lock on the database before
1372 * operating on synced logical slots to avoid conflict with the drop
1373 * happening here. The persistent synced slots are thus safe but there
1374 * is a possibility that the slot sync worker has created a temporary
1375 * slot (which stays active even on release) and we are trying to drop
1376 * that here. In practice, the chances of hitting this scenario are
1377 * less as during slot synchronization, the temporary slot is
1378 * immediately converted to persistent and thus is safe due to the
1379 * shared lock taken on the database. So, we'll just bail out in such
1380 * a case.
1381 *
1382 * XXX: We can consider shutting down the slot sync worker before
1383 * trying to drop synced temporary slots here.
1384 */
1385 if (active_pid)
1386 ereport(ERROR,
1387 (errcode(ERRCODE_OBJECT_IN_USE),
1388 errmsg("replication slot \"%s\" is active for PID %d",
1389 slotname, active_pid)));
1390
1391 /*
1392 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1393 * holding ReplicationSlotControlLock over filesystem operations,
1394 * release ReplicationSlotControlLock and use
1395 * ReplicationSlotDropAcquired.
1396 *
1397 * As that means the set of slots could change, restart scan from the
1398 * beginning each time we release the lock.
1399 */
1400 LWLockRelease(ReplicationSlotControlLock);
1402 goto restart;
1403 }
1404 LWLockRelease(ReplicationSlotControlLock);
1405}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg(), ERROR, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotSetInactiveSince()

static void ReplicationSlotSetInactiveSince ( ReplicationSlot s,
TimestampTz  ts,
bool  acquire_lock 
)
inlinestatic

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )

Definition at line 204 of file slot.c.

205{
206 bool found;
207
208 if (max_replication_slots == 0)
209 return;
210
212 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
213 &found);
214
215 if (!found)
216 {
217 int i;
218
219 /* First time through, so initialize */
221
222 for (i = 0; i < max_replication_slots; i++)
223 {
225
226 /* everything else is zeroed by the memset above */
227 SpinLockInit(&slot->mutex);
231 }
232 }
233}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:721
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:191
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:186
#define SpinLockInit(lock)
Definition: spin.h:57
LWLock io_in_progress_lock
Definition: slot.h:188

References ReplicationSlot::active_cv, ConditionVariableInit(), i, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), LWTRANCHE_REPLICATION_SLOT_IO, max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )

Definition at line 186 of file slot.c.

187{
188 Size size = 0;
189
190 if (max_replication_slots == 0)
191 return size;
192
193 size = offsetof(ReplicationSlotCtlData, replication_slots);
194 size = add_size(size,
196
197 return size;
198}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char *  name,
int  elevel 
)

Definition at line 267 of file slot.c.

268{
269 const char *cp;
270
271 if (strlen(name) == 0)
272 {
273 ereport(elevel,
274 (errcode(ERRCODE_INVALID_NAME),
275 errmsg("replication slot name \"%s\" is too short",
276 name)));
277 return false;
278 }
279
280 if (strlen(name) >= NAMEDATALEN)
281 {
282 ereport(elevel,
283 (errcode(ERRCODE_NAME_TOO_LONG),
284 errmsg("replication slot name \"%s\" is too long",
285 name)));
286 return false;
287 }
288
289 for (cp = name; *cp; cp++)
290 {
291 if (!((*cp >= 'a' && *cp <= 'z')
292 || (*cp >= '0' && *cp <= '9')
293 || (*cp == '_')))
294 {
295 ereport(elevel,
296 (errcode(ERRCODE_INVALID_NAME),
297 errmsg("replication slot name \"%s\" contains invalid character",
298 name),
299 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
300 return false;
301 }
302 }
303 return true;
304}
#define NAMEDATALEN

References ereport, errcode(), errhint(), errmsg(), name, and NAMEDATALEN.

Referenced by check_primary_slot_name(), parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ SearchNamedReplicationSlot()

ReplicationSlot * SearchNamedReplicationSlot ( const char *  name,
bool  need_lock 
)

Definition at line 479 of file slot.c.

480{
481 int i;
482 ReplicationSlot *slot = NULL;
483
484 if (need_lock)
485 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486
487 for (i = 0; i < max_replication_slots; i++)
488 {
490
491 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
492 {
493 slot = s;
494 break;
495 }
496 }
497
498 if (need_lock)
499 LWLockRelease(ReplicationSlotControlLock);
500
501 return slot;
502}

References ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, name, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by get_replslot_index(), pg_ls_replslotdir(), pgstat_reset_replslot(), ReadReplicationSlot(), ReplicationSlotAcquire(), StandbySlotsHaveCaughtup(), synchronize_one_slot(), and validate_sync_standby_slots().

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char *  slot_name)

Definition at line 2772 of file slot.c.

2773{
2774 const char *standby_slot_name;
2775
2776 /* Return false if there is no value in synchronized_standby_slots */
2778 return false;
2779
2780 /*
2781 * XXX: We are not expecting this list to be long so a linear search
2782 * shouldn't hurt but if that turns out not to be true then we can cache
2783 * this information for each WalSender as well.
2784 */
2785 standby_slot_name = synchronized_standby_slots_config->slot_names;
2786 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2787 {
2788 if (strcmp(standby_slot_name, slot_name) == 0)
2789 return true;
2790
2791 standby_slot_name += strlen(standby_slot_name) + 1;
2792 }
2793
2794 return false;
2795}
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:166
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:100

References i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 2805 of file slot.c.

2806{
2807 const char *name;
2808 int caught_up_slot_num = 0;
2809 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2810
2811 /*
2812 * Don't need to wait for the standbys to catch up if there is no value in
2813 * synchronized_standby_slots.
2814 */
2816 return true;
2817
2818 /*
2819 * Don't need to wait for the standbys to catch up if we are on a standby
2820 * server, since we do not support syncing slots to cascading standbys.
2821 */
2822 if (RecoveryInProgress())
2823 return true;
2824
2825 /*
2826 * Don't need to wait for the standbys to catch up if they are already
2827 * beyond the specified WAL location.
2828 */
2830 ss_oldest_flush_lsn >= wait_for_lsn)
2831 return true;
2832
2833 /*
2834 * To prevent concurrent slot dropping and creation while filtering the
2835 * slots, take the ReplicationSlotControlLock outside of the loop.
2836 */
2837 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2838
2840 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2841 {
2842 XLogRecPtr restart_lsn;
2843 bool invalidated;
2844 bool inactive;
2845 ReplicationSlot *slot;
2846
2847 slot = SearchNamedReplicationSlot(name, false);
2848
2849 /*
2850 * If a slot name provided in synchronized_standby_slots does not
2851 * exist, report a message and exit the loop.
2852 *
2853 * Though validate_sync_standby_slots (the GUC check_hook) tries to
2854 * avoid this, it can nonetheless happen because the user can specify
2855 * a nonexistent slot name before server startup. That function cannot
2856 * validate such a slot during startup, as ReplicationSlotCtl is not
2857 * initialized by then. Also, the user might have dropped one slot.
2858 */
2859 if (!slot)
2860 {
2861 ereport(elevel,
2862 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2863 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2864 name, "synchronized_standby_slots"),
2865 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2866 name),
2867 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2868 name, "synchronized_standby_slots"));
2869 break;
2870 }
2871
2872 /* Same as above: if a slot is not physical, exit the loop. */
2873 if (SlotIsLogical(slot))
2874 {
2875 ereport(elevel,
2876 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2877 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2878 name, "synchronized_standby_slots"),
2879 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2880 name),
2881 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2882 name, "synchronized_standby_slots"));
2883 break;
2884 }
2885
2886 SpinLockAcquire(&slot->mutex);
2887 restart_lsn = slot->data.restart_lsn;
2888 invalidated = slot->data.invalidated != RS_INVAL_NONE;
2889 inactive = slot->active_pid == 0;
2890 SpinLockRelease(&slot->mutex);
2891
2892 if (invalidated)
2893 {
2894 /* Specified physical slot has been invalidated */
2895 ereport(elevel,
2896 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2897 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2898 name, "synchronized_standby_slots"),
2899 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2900 name),
2901 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2902 name, "synchronized_standby_slots"));
2903 break;
2904 }
2905
2906 if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2907 {
2908 /* Log a message if no active_pid for this physical slot */
2909 if (inactive)
2910 ereport(elevel,
2911 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2912 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2913 name, "synchronized_standby_slots"),
2914 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2915 name),
2916 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2917 name, "synchronized_standby_slots"));
2918
2919 /* Continue if the current slot hasn't caught up. */
2920 break;
2921 }
2922
2923 Assert(restart_lsn >= wait_for_lsn);
2924
2925 if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2926 min_restart_lsn > restart_lsn)
2927 min_restart_lsn = restart_lsn;
2928
2929 caught_up_slot_num++;
2930
2931 name += strlen(name) + 1;
2932 }
2933
2934 LWLockRelease(ReplicationSlotControlLock);
2935
2936 /*
2937 * Return false if not all the standbys have caught up to the specified
2938 * WAL location.
2939 */
2940 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2941 return false;
2942
2943 /* The ss_oldest_flush_lsn must not retreat. */
2945 min_restart_lsn >= ss_oldest_flush_lsn);
2946
2947 ss_oldest_flush_lsn = min_restart_lsn;
2948
2949 return true;
2950}
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:172
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References ReplicationSlot::active_pid, Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg(), i, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsInvalid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2089 of file slot.c.

2090{
2091 DIR *replication_dir;
2092 struct dirent *replication_de;
2093
2094 elog(DEBUG1, "starting up replication slots");
2095
2096 /* restore all slots by iterating over all on-disk entries */
2097 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2098 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2099 {
2100 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2101 PGFileType de_type;
2102
2103 if (strcmp(replication_de->d_name, ".") == 0 ||
2104 strcmp(replication_de->d_name, "..") == 0)
2105 continue;
2106
2107 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2108 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2109
2110 /* we're only creating directories here, skip if it's not our's */
2111 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2112 continue;
2113
2114 /* we crashed while a slot was being setup or deleted, clean up */
2115 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2116 {
2117 if (!rmtree(path, true))
2118 {
2120 (errmsg("could not remove directory \"%s\"",
2121 path)));
2122 continue;
2123 }
2125 continue;
2126 }
2127
2128 /* looks like a slot in a normal state, restore */
2129 RestoreSlotFromDisk(replication_de->d_name);
2130 }
2131 FreeDir(replication_dir);
2132
2133 /* currently no slots exist, we're done. */
2134 if (max_replication_slots <= 0)
2135 return;
2136
2137 /* Now that we have recovered all the data, compute replication xmin */
2140}
#define WARNING
Definition: elog.h:36
int FreeDir(DIR *dir)
Definition: fd.c:3025
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2907
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2973
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2366
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errmsg(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 2959 of file slot.c.

2960{
2961 /*
2962 * Don't need to wait for the standby to catch up if the current acquired
2963 * slot is not a logical failover slot, or there is no value in
2964 * synchronized_standby_slots.
2965 */
2967 return;
2968
2970
2971 for (;;)
2972 {
2974
2976 {
2977 ConfigReloadPending = false;
2979 }
2980
2981 /* Exit if done waiting for every slot. */
2982 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2983 break;
2984
2985 /*
2986 * Wait for the slots in the synchronized_standby_slots to catch up,
2987 * but use a timeout (1s) so we can also check if the
2988 * synchronized_standby_slots has been changed.
2989 */
2991 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2992 }
2993
2995}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2805
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:114

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_mins

PGDLLIMPORT int idle_replication_slot_timeout_mins
extern

◆ max_replication_slots

◆ MyReplicationSlot

PGDLLIMPORT ReplicationSlot* MyReplicationSlot
extern

Definition at line 147 of file slot.c.

Referenced by binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), LogicalReplicationSlotHasPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_local_synced_slot(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ synchronized_standby_slots

PGDLLIMPORT char* synchronized_standby_slots
extern

Definition at line 163 of file slot.c.