PostgreSQL Source Code git master
slot.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
#include "utils/varlena.h"
Include dependency graph for slot.c:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotOnDisk
 
struct  SyncStandbySlotsConfigData
 
struct  SlotInvalidationCauseMap
 

Macros

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)
 
#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)
 
#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
 
#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
#define SLOT_MAGIC   0x1051CA1 /* format identifier */
 
#define SLOT_VERSION   5 /* version for new files */
 

Typedefs

typedef struct ReplicationSlotOnDisk ReplicationSlotOnDisk
 
typedef struct SlotInvalidationCauseMap SlotInvalidationCauseMap
 

Functions

 StaticAssertDecl (lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
 
static void ReplicationSlotShmemExit (int code, Datum arg)
 
static void ReplicationSlotDropPtr (ReplicationSlot *slot)
 
static void RestoreSlotFromDisk (const char *name)
 
static void CreateSlotOnDisk (ReplicationSlot *slot)
 
static void SaveSlotToPath (ReplicationSlot *slot, const char *dir, int elevel)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, int elevel)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
void ReplicationSlotReserveWal (void)
 
static void ReportSlotInvalidation (ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
 
static bool CanInvalidateIdleSlot (ReplicationSlot *s)
 
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TransactionId initial_effective_xmin, TransactionId initial_catalog_effective_xmin, XLogRecPtr initial_restart_lsn, TimestampTz *inactive_since, TimestampTz now)
 
static bool InvalidatePossiblyObsoleteSlot (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void StartupReplicationSlots (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const char * GetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
static bool validate_sync_standby_slots (char *rawname, List **elemlist)
 
bool check_synchronized_standby_slots (char **newval, void **extra, GucSource source)
 
void assign_synchronized_standby_slots (const char *newval, void *extra)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 
bool check_idle_replication_slot_timeout (int *newval, void **extra, GucSource source)
 

Variables

static const SlotInvalidationCauseMap SlotInvalidationCauses []
 
ReplicationSlotCtlDataReplicationSlotCtl = NULL
 
ReplicationSlotMyReplicationSlot = NULL
 
int max_replication_slots = 10
 
int idle_replication_slot_timeout_mins = 0
 
char * synchronized_standby_slots
 
static SyncStandbySlotsConfigDatasynchronized_standby_slots_config
 
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ ReplicationSlotOnDiskChecksummedSize

#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize

Definition at line 134 of file slot.c.

◆ ReplicationSlotOnDiskConstantSize

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)

Definition at line 128 of file slot.c.

◆ ReplicationSlotOnDiskNotChecksummedSize

#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)

Definition at line 131 of file slot.c.

◆ ReplicationSlotOnDiskV2Size

#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize

Definition at line 137 of file slot.c.

◆ SLOT_MAGIC

#define SLOT_MAGIC   0x1051CA1 /* format identifier */

Definition at line 140 of file slot.c.

◆ SLOT_VERSION

#define SLOT_VERSION   5 /* version for new files */

Definition at line 141 of file slot.c.

Typedef Documentation

◆ ReplicationSlotOnDisk

◆ SlotInvalidationCauseMap

Function Documentation

◆ assign_synchronized_standby_slots()

void assign_synchronized_standby_slots ( const char *  newval,
void *  extra 
)

Definition at line 2755 of file slot.c.

2756{
2757 /*
2758 * The standby slots may have changed, so we must recompute the oldest
2759 * LSN.
2760 */
2762
2764}
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:172
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:166
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References InvalidXLogRecPtr, ss_oldest_flush_lsn, and synchronized_standby_slots_config.

◆ CanInvalidateIdleSlot()

static bool CanInvalidateIdleSlot ( ReplicationSlot s)
inlinestatic

Definition at line 1613 of file slot.c.

1614{
1617 s->inactive_since > 0 &&
1618 !(RecoveryInProgress() && s->data.synced));
1619}
int idle_replication_slot_timeout_mins
Definition: slot.c:157
ReplicationSlotPersistentData data
Definition: slot.h:185
TimestampTz inactive_since
Definition: slot.h:217
bool RecoveryInProgress(void)
Definition: xlog.c:6380
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References ReplicationSlot::data, idle_replication_slot_timeout_mins, ReplicationSlot::inactive_since, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, ReplicationSlotPersistentData::synced, and XLogRecPtrIsInvalid.

Referenced by DetermineSlotInvalidationCause().

◆ check_idle_replication_slot_timeout()

bool check_idle_replication_slot_timeout ( int *  newval,
void **  extra,
GucSource  source 
)

Definition at line 3002 of file slot.c.

3003{
3004 if (IsBinaryUpgrade && *newval != 0)
3005 {
3006 GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
3007 "idle_replication_slot_timeout");
3008 return false;
3009 }
3010
3011 return true;
3012}
bool IsBinaryUpgrade
Definition: globals.c:120
#define newval
#define GUC_check_errdetail
Definition: guc.h:481

References GUC_check_errdetail, IsBinaryUpgrade, and newval.

◆ check_synchronized_standby_slots()

bool check_synchronized_standby_slots ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 2701 of file slot.c.

2702{
2703 char *rawname;
2704 char *ptr;
2705 List *elemlist;
2706 int size;
2707 bool ok;
2709
2710 if ((*newval)[0] == '\0')
2711 return true;
2712
2713 /* Need a modifiable copy of the GUC string */
2714 rawname = pstrdup(*newval);
2715
2716 /* Now verify if the specified slots exist and have correct type */
2717 ok = validate_sync_standby_slots(rawname, &elemlist);
2718
2719 if (!ok || elemlist == NIL)
2720 {
2721 pfree(rawname);
2722 list_free(elemlist);
2723 return ok;
2724 }
2725
2726 /* Compute the size required for the SyncStandbySlotsConfigData struct */
2727 size = offsetof(SyncStandbySlotsConfigData, slot_names);
2728 foreach_ptr(char, slot_name, elemlist)
2729 size += strlen(slot_name) + 1;
2730
2731 /* GUC extra value must be guc_malloc'd, not palloc'd */
2732 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2733
2734 /* Transform the data into SyncStandbySlotsConfigData */
2735 config->nslotnames = list_length(elemlist);
2736
2737 ptr = config->slot_names;
2738 foreach_ptr(char, slot_name, elemlist)
2739 {
2740 strcpy(ptr, slot_name);
2741 ptr += strlen(slot_name) + 1;
2742 }
2743
2744 *extra = config;
2745
2746 pfree(rawname);
2747 list_free(elemlist);
2748 return true;
2749}
#define LOG
Definition: elog.h:31
void * guc_malloc(int elevel, size_t size)
Definition: guc.c:638
void list_free(List *list)
Definition: list.c:1546
char * pstrdup(const char *in)
Definition: mcxt.c:1699
void pfree(void *pointer)
Definition: mcxt.c:1524
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition: slot.c:2646
Definition: pg_list.h:54
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:100

References foreach_ptr, guc_malloc(), list_free(), list_length(), LOG, newval, NIL, SyncStandbySlotsConfigData::nslotnames, pfree(), pstrdup(), SyncStandbySlotsConfigData::slot_names, and validate_sync_standby_slots().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2032 of file slot.c.

2033{
2034 int i;
2035
2036 elog(DEBUG1, "performing replication slot checkpoint");
2037
2038 /*
2039 * Prevent any slot from being created/dropped while we're active. As we
2040 * explicitly do *not* want to block iterating over replication_slots or
2041 * acquiring a slot we cannot take the control lock - but that's OK,
2042 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2043 * enough to guarantee that nobody can change the in_use bits on us.
2044 */
2045 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2046
2047 for (i = 0; i < max_replication_slots; i++)
2048 {
2050 char path[MAXPGPATH];
2051
2052 if (!s->in_use)
2053 continue;
2054
2055 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2056 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2057
2058 /*
2059 * Slot's data is not flushed each time the confirmed_flush LSN is
2060 * updated as that could lead to frequent writes. However, we decide
2061 * to force a flush of all logical slot's data at the time of shutdown
2062 * if the confirmed_flush LSN is changed since we last flushed it to
2063 * disk. This helps in avoiding an unnecessary retreat of the
2064 * confirmed_flush LSN after restart.
2065 */
2066 if (is_shutdown && SlotIsLogical(s))
2067 {
2069
2070 if (s->data.invalidated == RS_INVAL_NONE &&
2072 {
2073 s->just_dirtied = true;
2074 s->dirty = true;
2075 }
2077 }
2078
2079 SaveSlotToPath(s, path, LOG);
2080 }
2081 LWLockRelease(ReplicationSlotAllocationLock);
2082}
#define NameStr(name)
Definition: c.h:717
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
int i
Definition: isn.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1179
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1899
@ LW_SHARED
Definition: lwlock.h:115
#define MAXPGPATH
#define sprintf
Definition: port.h:241
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2211
int max_replication_slots
Definition: slot.c:150
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:144
#define PG_REPLSLOT_DIR
Definition: slot.h:21
@ RS_INVAL_NONE
Definition: slot.h:53
#define SlotIsLogical(slot)
Definition: slot.h:221
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
ReplicationSlot replication_slots[1]
Definition: slot.h:232
XLogRecPtr confirmed_flush
Definition: slot.h:111
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:103
slock_t mutex
Definition: slot.h:158
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:210
bool in_use
Definition: slot.h:161
bool just_dirtied
Definition: slot.h:167
bool dirty
Definition: slot.h:168

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1435 of file slot.c.

1436{
1438 ereport(ERROR,
1439 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1440 errmsg("permission denied to use replication slots"),
1441 errdetail("Only roles with the %s attribute may use replication slots.",
1442 "REPLICATION")));
1443}
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
Oid GetUserId(void)
Definition: miscinit.c:520
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:739

References ereport, errcode(), errdetail(), errmsg(), ERROR, GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )

Definition at line 1413 of file slot.c.

1414{
1415 /*
1416 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1417 * needs the same check.
1418 */
1419
1420 if (max_replication_slots == 0)
1421 ereport(ERROR,
1422 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1423 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1424
1426 ereport(ERROR,
1427 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1429}
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ CreateSlotOnDisk()

static void CreateSlotOnDisk ( ReplicationSlot slot)
static

Definition at line 2150 of file slot.c.

2151{
2152 char tmppath[MAXPGPATH];
2153 char path[MAXPGPATH];
2154 struct stat st;
2155
2156 /*
2157 * No need to take out the io_in_progress_lock, nobody else can see this
2158 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2159 * takes out the lock, if we'd take the lock here, we'd deadlock.
2160 */
2161
2162 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2163 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2164
2165 /*
2166 * It's just barely possible that some previous effort to create or drop a
2167 * slot with this name left a temp directory lying around. If that seems
2168 * to be the case, try to remove it. If the rmtree() fails, we'll error
2169 * out at the MakePGDirectory() below, so we don't bother checking
2170 * success.
2171 */
2172 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2173 rmtree(tmppath, true);
2174
2175 /* Create and fsync the temporary slot directory. */
2176 if (MakePGDirectory(tmppath) < 0)
2177 ereport(ERROR,
2179 errmsg("could not create directory \"%s\": %m",
2180 tmppath)));
2181 fsync_fname(tmppath, true);
2182
2183 /* Write the actual state file. */
2184 slot->dirty = true; /* signal that we really need to write */
2185 SaveSlotToPath(slot, tmppath, ERROR);
2186
2187 /* Rename the directory into place. */
2188 if (rename(tmppath, path) != 0)
2189 ereport(ERROR,
2191 errmsg("could not rename file \"%s\" to \"%s\": %m",
2192 tmppath, path)));
2193
2194 /*
2195 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2196 * would persist after an OS crash or not - so, force a restart. The
2197 * restart would try to fsync this again till it works.
2198 */
2200
2201 fsync_fname(path, true);
2203
2205}
int errcode_for_file_access(void)
Definition: elog.c:876
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3936
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:755
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
#define stat
Definition: win32_port.h:274
#define S_ISDIR(m)
Definition: win32_port.h:315

References ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), ERROR, fsync_fname(), MakePGDirectory(), MAXPGPATH, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, rmtree(), S_ISDIR, SaveSlotToPath(), sprintf, stat::st_mode, START_CRIT_SECTION, and stat.

Referenced by ReplicationSlotCreate().

◆ DetermineSlotInvalidationCause()

static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
TransactionId  initial_effective_xmin,
TransactionId  initial_catalog_effective_xmin,
XLogRecPtr  initial_restart_lsn,
TimestampTz inactive_since,
TimestampTz  now 
)
static

Definition at line 1629 of file slot.c.

1636{
1637 Assert(possible_causes != RS_INVAL_NONE);
1638
1639 if (possible_causes & RS_INVAL_WAL_REMOVED)
1640 {
1641 if (initial_restart_lsn != InvalidXLogRecPtr &&
1642 initial_restart_lsn < oldestLSN)
1643 return RS_INVAL_WAL_REMOVED;
1644 }
1645
1646 if (possible_causes & RS_INVAL_HORIZON)
1647 {
1648 /* invalid DB oid signals a shared relation */
1649 if (SlotIsLogical(s) &&
1650 (dboid == InvalidOid || dboid == s->data.database))
1651 {
1652 if (TransactionIdIsValid(initial_effective_xmin) &&
1653 TransactionIdPrecedesOrEquals(initial_effective_xmin,
1654 snapshotConflictHorizon))
1655 return RS_INVAL_HORIZON;
1656 else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1657 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1658 snapshotConflictHorizon))
1659 return RS_INVAL_HORIZON;
1660 }
1661 }
1662
1663 if (possible_causes & RS_INVAL_WAL_LEVEL)
1664 {
1665 if (SlotIsLogical(s))
1666 return RS_INVAL_WAL_LEVEL;
1667 }
1668
1669 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1670 {
1671 Assert(now > 0);
1672
1673 if (CanInvalidateIdleSlot(s))
1674 {
1675 /*
1676 * We simulate the invalidation due to idle_timeout as the minimum
1677 * time idle time is one minute which makes tests take a long
1678 * time.
1679 */
1680#ifdef USE_INJECTION_POINTS
1681 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1682 {
1683 *inactive_since = 0; /* since the beginning of time */
1684 return RS_INVAL_IDLE_TIMEOUT;
1685 }
1686#endif
1687
1688 /*
1689 * Check if the slot needs to be invalidated due to
1690 * idle_replication_slot_timeout GUC.
1691 */
1694 {
1695 *inactive_since = s->inactive_since;
1696 return RS_INVAL_IDLE_TIMEOUT;
1697 }
1698 }
1699 }
1700
1701 return RS_INVAL_NONE;
1702}
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition: timestamp.c:1794
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define SECS_PER_MINUTE
Definition: timestamp.h:128
Assert(PointerIsAligned(start, uint64))
#define IS_INJECTION_POINT_ATTACHED(name)
#define InvalidOid
Definition: postgres_ext.h:37
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition: slot.c:1613
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:55
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:61
@ RS_INVAL_HORIZON
Definition: slot.h:57
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:59
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), CanInvalidateIdleSlot(), ReplicationSlot::data, ReplicationSlotPersistentData::database, idle_replication_slot_timeout_mins, ReplicationSlot::inactive_since, InvalidOid, InvalidXLogRecPtr, IS_INJECTION_POINT_ATTACHED, now(), RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_INVAL_WAL_REMOVED, SECS_PER_MINUTE, SlotIsLogical, TimestampDifferenceExceedsSeconds(), TransactionIdIsValid, and TransactionIdPrecedesOrEquals().

Referenced by InvalidatePossiblyObsoleteSlot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char *  cause_name)

Definition at line 2607 of file slot.c.

2608{
2609 Assert(cause_name);
2610
2611 /* Search lookup table for the cause having this name */
2612 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2613 {
2614 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2616 }
2617
2618 Assert(false);
2619 return RS_INVAL_NONE; /* to keep compiler quiet */
2620}
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:112
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:65
ReplicationSlotInvalidationCause cause
Definition: slot.c:108

References Assert(), SlotInvalidationCauseMap::cause, i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by synchronize_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2627 of file slot.c.

2628{
2629 /* Search lookup table for the name of this cause */
2630 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2631 {
2632 if (SlotInvalidationCauses[i].cause == cause)
2634 }
2635
2636 Assert(false);
2637 return "none"; /* to keep compiler quiet */
2638}
const char * cause_name
Definition: slot.c:109

References Assert(), SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 1976 of file slot.c.

1979{
1980 XLogRecPtr oldestLSN;
1981 bool invalidated = false;
1982
1983 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
1984 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
1985 Assert(possible_causes != RS_INVAL_NONE);
1986
1987 if (max_replication_slots == 0)
1988 return invalidated;
1989
1990 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1991
1992restart:
1993 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1994 for (int i = 0; i < max_replication_slots; i++)
1995 {
1997
1998 if (!s->in_use)
1999 continue;
2000
2001 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2002 snapshotConflictHorizon,
2003 &invalidated))
2004 {
2005 /* if the lock was released, start from scratch */
2006 goto restart;
2007 }
2008 }
2009 LWLockRelease(ReplicationSlotControlLock);
2010
2011 /*
2012 * If any slots have been invalidated, recalculate the resource limits.
2013 */
2014 if (invalidated)
2015 {
2018 }
2019
2020 return invalidated;
2021}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1100
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1156
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1718
int wal_segment_size
Definition: xlog.c:143
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), i, ReplicationSlot::in_use, InvalidatePossiblyObsoleteSlot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ InvalidatePossiblyObsoleteSlot()

static bool InvalidatePossiblyObsoleteSlot ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
bool *  invalidated 
)
static

Definition at line 1718 of file slot.c.

1723{
1724 int last_signaled_pid = 0;
1725 bool released_lock = false;
1726 bool terminated = false;
1727 TransactionId initial_effective_xmin = InvalidTransactionId;
1728 TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1729 XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1731 TimestampTz inactive_since = 0;
1732
1733 for (;;)
1734 {
1735 XLogRecPtr restart_lsn;
1736 NameData slotname;
1737 int active_pid = 0;
1739 TimestampTz now = 0;
1740 long slot_idle_secs = 0;
1741
1742 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1743
1744 if (!s->in_use)
1745 {
1746 if (released_lock)
1747 LWLockRelease(ReplicationSlotControlLock);
1748 break;
1749 }
1750
1751 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1752 {
1753 /*
1754 * Assign the current time here to avoid system call overhead
1755 * while holding the spinlock in subsequent code.
1756 */
1758 }
1759
1760 /*
1761 * Check if the slot needs to be invalidated. If it needs to be
1762 * invalidated, and is not currently acquired, acquire it and mark it
1763 * as having been invalidated. We do this with the spinlock held to
1764 * avoid race conditions -- for example the restart_lsn could move
1765 * forward, or the slot could be dropped.
1766 */
1768
1769 restart_lsn = s->data.restart_lsn;
1770
1771 /* we do nothing if the slot is already invalid */
1772 if (s->data.invalidated == RS_INVAL_NONE)
1773 {
1774 /*
1775 * The slot's mutex will be released soon, and it is possible that
1776 * those values change since the process holding the slot has been
1777 * terminated (if any), so record them here to ensure that we
1778 * would report the correct invalidation cause.
1779 *
1780 * Unlike other slot attributes, slot's inactive_since can't be
1781 * changed until the acquired slot is released or the owning
1782 * process is terminated. So, the inactive slot can only be
1783 * invalidated immediately without being terminated.
1784 */
1785 if (!terminated)
1786 {
1787 initial_restart_lsn = s->data.restart_lsn;
1788 initial_effective_xmin = s->effective_xmin;
1789 initial_catalog_effective_xmin = s->effective_catalog_xmin;
1790 }
1791
1792 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1793 s, oldestLSN,
1794 dboid,
1795 snapshotConflictHorizon,
1796 initial_effective_xmin,
1797 initial_catalog_effective_xmin,
1798 initial_restart_lsn,
1799 &inactive_since,
1800 now);
1801 }
1802
1803 /*
1804 * The invalidation cause recorded previously should not change while
1805 * the process owning the slot (if any) has been terminated.
1806 */
1807 Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1808 invalidation_cause_prev != invalidation_cause));
1809
1810 /* if there's no invalidation, we're done */
1811 if (invalidation_cause == RS_INVAL_NONE)
1812 {
1814 if (released_lock)
1815 LWLockRelease(ReplicationSlotControlLock);
1816 break;
1817 }
1818
1819 slotname = s->data.name;
1820 active_pid = s->active_pid;
1821
1822 /*
1823 * If the slot can be acquired, do so and mark it invalidated
1824 * immediately. Otherwise we'll signal the owning process, below, and
1825 * retry.
1826 */
1827 if (active_pid == 0)
1828 {
1830 s->active_pid = MyProcPid;
1831 s->data.invalidated = invalidation_cause;
1832
1833 /*
1834 * XXX: We should consider not overwriting restart_lsn and instead
1835 * just rely on .invalidated.
1836 */
1837 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1839
1840 /* Let caller know */
1841 *invalidated = true;
1842 }
1843
1845
1846 /*
1847 * The logical replication slots shouldn't be invalidated as GUC
1848 * max_slot_wal_keep_size is set to -1 and
1849 * idle_replication_slot_timeout is set to 0 during the binary
1850 * upgrade. See check_old_cluster_for_valid_slots() where we ensure
1851 * that no invalidated before the upgrade.
1852 */
1853 Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1854
1855 /*
1856 * Calculate the idle time duration of the slot if slot is marked
1857 * invalidated with RS_INVAL_IDLE_TIMEOUT.
1858 */
1859 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1860 {
1861 int slot_idle_usecs;
1862
1863 TimestampDifference(inactive_since, now, &slot_idle_secs,
1864 &slot_idle_usecs);
1865 }
1866
1867 if (active_pid != 0)
1868 {
1869 /*
1870 * Prepare the sleep on the slot's condition variable before
1871 * releasing the lock, to close a possible race condition if the
1872 * slot is released before the sleep below.
1873 */
1875
1876 LWLockRelease(ReplicationSlotControlLock);
1877 released_lock = true;
1878
1879 /*
1880 * Signal to terminate the process that owns the slot, if we
1881 * haven't already signalled it. (Avoidance of repeated
1882 * signalling is the only reason for there to be a loop in this
1883 * routine; otherwise we could rely on caller's restart loop.)
1884 *
1885 * There is the race condition that other process may own the slot
1886 * after its current owner process is terminated and before this
1887 * process owns it. To handle that, we signal only if the PID of
1888 * the owning process has changed from the previous time. (This
1889 * logic assumes that the same PID is not reused very quickly.)
1890 */
1891 if (last_signaled_pid != active_pid)
1892 {
1893 ReportSlotInvalidation(invalidation_cause, true, active_pid,
1894 slotname, restart_lsn,
1895 oldestLSN, snapshotConflictHorizon,
1896 slot_idle_secs);
1897
1898 if (MyBackendType == B_STARTUP)
1899 (void) SendProcSignal(active_pid,
1902 else
1903 (void) kill(active_pid, SIGTERM);
1904
1905 last_signaled_pid = active_pid;
1906 terminated = true;
1907 invalidation_cause_prev = invalidation_cause;
1908 }
1909
1910 /* Wait until the slot is released. */
1912 WAIT_EVENT_REPLICATION_SLOT_DROP);
1913
1914 /*
1915 * Re-acquire lock and start over; we expect to invalidate the
1916 * slot next time (unless another process acquires the slot in the
1917 * meantime).
1918 */
1919 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1920 continue;
1921 }
1922 else
1923 {
1924 /*
1925 * We hold the slot now and have already invalidated it; flush it
1926 * to ensure that state persists.
1927 *
1928 * Don't want to hold ReplicationSlotControlLock across file
1929 * system operations, so release it now but be sure to tell caller
1930 * to restart from scratch.
1931 */
1932 LWLockRelease(ReplicationSlotControlLock);
1933 released_lock = true;
1934
1935 /* Make sure the invalidated state persists across server restart */
1939
1940 ReportSlotInvalidation(invalidation_cause, false, active_pid,
1941 slotname, restart_lsn,
1942 oldestLSN, snapshotConflictHorizon,
1943 slot_idle_secs);
1944
1945 /* done with this slot for now */
1946 break;
1947 }
1948 }
1949
1950 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1951
1952 return released_lock;
1953}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1720
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:224
uint32 TransactionId
Definition: c.h:623
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int MyProcPid
Definition: globals.c:46
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1967
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2011
@ B_STARTUP
Definition: miscadmin.h:364
BackendType MyBackendType
Definition: miscinit.c:64
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:283
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1061
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TransactionId initial_effective_xmin, TransactionId initial_catalog_effective_xmin, XLogRecPtr initial_restart_lsn, TimestampTz *inactive_since, TimestampTz now)
Definition: slot.c:1629
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition: slot.c:1528
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
void ReplicationSlotSave(void)
Definition: slot.c:1043
void ReplicationSlotRelease(void)
Definition: slot.c:686
ReplicationSlotInvalidationCause
Definition: slot.h:52
TransactionId effective_catalog_xmin
Definition: slot.h:182
pid_t active_pid
Definition: slot.h:164
TransactionId effective_xmin
Definition: slot.h:181
ConditionVariable active_cv
Definition: slot.h:191
Definition: c.h:712
#define InvalidTransactionId
Definition: transam.h:31
#define kill(pid, sig)
Definition: win32_port.h:493

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), B_STARTUP, ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DetermineSlotInvalidationCause(), ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, GetCurrentTimestamp(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, kill, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockHeldByMeInMode(), LWLockRelease(), ReplicationSlot::mutex, MyBackendType, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, now(), PG_USED_FOR_ASSERTS_ONLY, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReportSlotInvalidation(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SendProcSignal(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by InvalidateObsoleteReplicationSlots().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char *  name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 559 of file slot.c.

560{
562 int active_pid;
563
564 Assert(name != NULL);
565
566retry:
567 Assert(MyReplicationSlot == NULL);
568
569 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
570
571 /* Check if the slot exits with the given name. */
573 if (s == NULL || !s->in_use)
574 {
575 LWLockRelease(ReplicationSlotControlLock);
576
578 (errcode(ERRCODE_UNDEFINED_OBJECT),
579 errmsg("replication slot \"%s\" does not exist",
580 name)));
581 }
582
583 /*
584 * This is the slot we want; check if it's active under some other
585 * process. In single user mode, we don't need this check.
586 */
588 {
589 /*
590 * Get ready to sleep on the slot in case it is active. (We may end
591 * up not sleeping, but we don't want to do this while holding the
592 * spinlock.)
593 */
594 if (!nowait)
596
597 /*
598 * It is important to reset the inactive_since under spinlock here to
599 * avoid race conditions with slot invalidation. See comments related
600 * to inactive_since in InvalidatePossiblyObsoleteSlot.
601 */
603 if (s->active_pid == 0)
605 active_pid = s->active_pid;
608 }
609 else
610 {
611 active_pid = MyProcPid;
613 }
614 LWLockRelease(ReplicationSlotControlLock);
615
616 /*
617 * If we found the slot but it's already active in another process, we
618 * wait until the owning process signals us that it's been released, or
619 * error out.
620 */
621 if (active_pid != MyProcPid)
622 {
623 if (!nowait)
624 {
625 /* Wait here until we get signaled, and then restart */
627 WAIT_EVENT_REPLICATION_SLOT_DROP);
629 goto retry;
630 }
631
633 (errcode(ERRCODE_OBJECT_IN_USE),
634 errmsg("replication slot \"%s\" is active for PID %d",
635 NameStr(s->data.name), active_pid)));
636 }
637 else if (!nowait)
638 ConditionVariableCancelSleep(); /* no sleep needed after all */
639
640 /* We made this slot active, so it's ours now. */
642
643 /*
644 * We need to check for invalidation after making the slot ours to avoid
645 * the possible race condition with the checkpointer that can otherwise
646 * invalidate the slot immediately after the check.
647 */
648 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
650 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
651 errmsg("can no longer access replication slot \"%s\"",
652 NameStr(s->data.name)),
653 errdetail("This replication slot has been invalidated due to \"%s\".",
655
656 /* Let everybody know we've modified this slot */
658
659 /*
660 * The call to pgstat_acquire_replslot() protects against stats for a
661 * different slot, from before a restart or such, being present during
662 * pgstat_report_replslot().
663 */
664 if (SlotIsLogical(s))
666
667
668 if (am_walsender)
669 {
672 ? errmsg("acquired logical replication slot \"%s\"",
673 NameStr(s->data.name))
674 : errmsg("acquired physical replication slot \"%s\"",
675 NameStr(s->data.name)));
676 }
677}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
bool IsUnderPostmaster
Definition: globals.c:119
void pgstat_acquire_replslot(ReplicationSlot *slot)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:479
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2627
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:239
const char * name
bool am_walsender
Definition: walsender.c:116
bool log_replication_commands
Definition: walsender.c:126

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg(), ERROR, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by binary_upgrade_logical_slot_has_caught_up(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char *  name,
const bool *  failover,
const bool *  two_phase 
)

Definition at line 837 of file slot.c.

839{
840 bool update_slot = false;
841
842 Assert(MyReplicationSlot == NULL);
843 Assert(failover || two_phase);
844
845 ReplicationSlotAcquire(name, false, true);
846
849 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
850 errmsg("cannot use %s with a physical replication slot",
851 "ALTER_REPLICATION_SLOT"));
852
853 if (RecoveryInProgress())
854 {
855 /*
856 * Do not allow users to alter the slots which are currently being
857 * synced from the primary to the standby.
858 */
861 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
862 errmsg("cannot alter replication slot \"%s\"", name),
863 errdetail("This replication slot is being synchronized from the primary server."));
864
865 /*
866 * Do not allow users to enable failover on the standby as we do not
867 * support sync to the cascading standby.
868 */
869 if (failover && *failover)
871 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 errmsg("cannot enable failover for a replication slot"
873 " on the standby"));
874 }
875
876 if (failover)
877 {
878 /*
879 * Do not allow users to enable failover for temporary slots as we do
880 * not support syncing temporary slots to the standby.
881 */
882 if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
884 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
885 errmsg("cannot enable failover for a temporary replication slot"));
886
887 if (MyReplicationSlot->data.failover != *failover)
888 {
890 MyReplicationSlot->data.failover = *failover;
892
893 update_slot = true;
894 }
895 }
896
898 {
902
903 update_slot = true;
904 }
905
906 if (update_slot)
907 {
910 }
911
913}
static bool two_phase
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:559
@ RS_TEMPORARY
Definition: slot.h:40
#define SlotIsPhysical(slot)
Definition: slot.h:220
ReplicationSlotPersistency persistency
Definition: slot.h:81

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, ReplicationSlotPersistentData::failover, ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 775 of file slot.c.

776{
777 int i;
778
779 Assert(MyReplicationSlot == NULL);
780
781restart:
782 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
783 for (i = 0; i < max_replication_slots; i++)
784 {
786
787 if (!s->in_use)
788 continue;
789
791 if ((s->active_pid == MyProcPid &&
792 (!synced_only || s->data.synced)))
793 {
796 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
797
799
801 goto restart;
802 }
803 else
805 }
806
807 LWLockRelease(ReplicationSlotControlLock);
808}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:936

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RS_TEMPORARY, SpinLockAcquire, SpinLockRelease, and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char *  name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)

Definition at line 324 of file slot.c.

327{
328 ReplicationSlot *slot = NULL;
329 int i;
330
331 Assert(MyReplicationSlot == NULL);
332
334
335 if (failover)
336 {
337 /*
338 * Do not allow users to create the failover enabled slots on the
339 * standby as we do not support sync to the cascading standby.
340 *
341 * However, failover enabled slots can be created during slot
342 * synchronization because we need to retain the same values as the
343 * remote slot.
344 */
347 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348 errmsg("cannot enable failover for a replication slot created on the standby"));
349
350 /*
351 * Do not allow users to create failover enabled temporary slots,
352 * because temporary slots will not be synced to the standby.
353 *
354 * However, failover enabled temporary slots can be created during
355 * slot synchronization. See the comments atop slotsync.c for details.
356 */
357 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
359 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
360 errmsg("cannot enable failover for a temporary replication slot"));
361 }
362
363 /*
364 * If some other backend ran this code concurrently with us, we'd likely
365 * both allocate the same slot, and that would be bad. We'd also be at
366 * risk of missing a name collision. Also, we don't want to try to create
367 * a new slot while somebody's busy cleaning up an old one, because we
368 * might both be monkeying with the same directory.
369 */
370 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
371
372 /*
373 * Check for name collision, and identify an allocatable slot. We need to
374 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
375 * else can change the in_use flags while we're looking at them.
376 */
377 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
378 for (i = 0; i < max_replication_slots; i++)
379 {
381
382 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
385 errmsg("replication slot \"%s\" already exists", name)));
386 if (!s->in_use && slot == NULL)
387 slot = s;
388 }
389 LWLockRelease(ReplicationSlotControlLock);
390
391 /* If all slots are in use, we're out of luck. */
392 if (slot == NULL)
394 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
395 errmsg("all replication slots are in use"),
396 errhint("Free one or increase \"max_replication_slots\".")));
397
398 /*
399 * Since this slot is not in use, nobody should be looking at any part of
400 * it other than the in_use field unless they're trying to allocate it.
401 * And since we hold ReplicationSlotAllocationLock, nobody except us can
402 * be doing that. So it's safe to initialize the slot.
403 */
404 Assert(!slot->in_use);
405 Assert(slot->active_pid == 0);
406
407 /* first initialize persistent data */
408 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
409 namestrcpy(&slot->data.name, name);
410 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
411 slot->data.persistency = persistency;
412 slot->data.two_phase = two_phase;
414 slot->data.failover = failover;
415 slot->data.synced = synced;
416
417 /* and then data only present in shared memory */
418 slot->just_dirtied = false;
419 slot->dirty = false;
427 slot->inactive_since = 0;
428
429 /*
430 * Create the slot on disk. We haven't actually marked the slot allocated
431 * yet, so no special cleanup is required if this errors out.
432 */
433 CreateSlotOnDisk(slot);
434
435 /*
436 * We need to briefly prevent any other backend from iterating over the
437 * slots while we flip the in_use flag. We also need to set the active
438 * flag while holding the ControlLock as otherwise a concurrent
439 * ReplicationSlotAcquire() could acquire the slot as well.
440 */
441 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
442
443 slot->in_use = true;
444
445 /* We can now mark the slot active, and that makes it our slot. */
446 SpinLockAcquire(&slot->mutex);
447 Assert(slot->active_pid == 0);
448 slot->active_pid = MyProcPid;
449 SpinLockRelease(&slot->mutex);
450 MyReplicationSlot = slot;
451
452 LWLockRelease(ReplicationSlotControlLock);
453
454 /*
455 * Create statistics entry for the new logical slot. We don't collect any
456 * stats for physical slots, so no need to create an entry for the same.
457 * See ReplicationSlotDropPtr for why we need to do this before releasing
458 * ReplicationSlotAllocationLock.
459 */
460 if (SlotIsLogical(slot))
462
463 /*
464 * Now that the slot has been marked as in_use and active, it's safe to
465 * let somebody else try to allocate a slot.
466 */
467 LWLockRelease(ReplicationSlotAllocationLock);
468
469 /* Let everybody know we've modified this slot */
471}
int errhint(const char *fmt,...)
Definition: elog.c:1317
Oid MyDatabaseId
Definition: globals.c:93
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2150
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:267
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1647
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:201
XLogRecPtr candidate_restart_valid
Definition: slot.h:202
XLogRecPtr candidate_restart_lsn
Definition: slot.h:203
TransactionId candidate_catalog_xmin
Definition: slot.h:200

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, ReplicationSlotPersistentData::failover, i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char *  name,
bool  nowait 
)

Definition at line 814 of file slot.c.

815{
816 Assert(MyReplicationSlot == NULL);
817
818 ReplicationSlotAcquire(name, nowait, false);
819
820 /*
821 * Do not allow users to drop the slots which are currently being synced
822 * from the primary to the standby.
823 */
826 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
827 errmsg("cannot drop replication slot \"%s\"", name),
828 errdetail("This replication slot is being synchronized from the primary server."));
829
831}
void ReplicationSlotDropAcquired(void)
Definition: slot.c:919

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )

Definition at line 919 of file slot.c.

920{
922
923 Assert(MyReplicationSlot != NULL);
924
925 /* slot isn't acquired anymore */
926 MyReplicationSlot = NULL;
927
929}

References Assert(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropPtr()

static void ReplicationSlotDropPtr ( ReplicationSlot slot)
static

Definition at line 936 of file slot.c.

937{
938 char path[MAXPGPATH];
939 char tmppath[MAXPGPATH];
940
941 /*
942 * If some other backend ran this code concurrently with us, we might try
943 * to delete a slot with a certain name while someone else was trying to
944 * create a slot with the same name.
945 */
946 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
947
948 /* Generate pathnames. */
949 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
950 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
951
952 /*
953 * Rename the slot directory on disk, so that we'll no longer recognize
954 * this as a valid slot. Note that if this fails, we've got to mark the
955 * slot inactive before bailing out. If we're dropping an ephemeral or a
956 * temporary slot, we better never fail hard as the caller won't expect
957 * the slot to survive and this might get called during error handling.
958 */
959 if (rename(path, tmppath) == 0)
960 {
961 /*
962 * We need to fsync() the directory we just renamed and its parent to
963 * make sure that our changes are on disk in a crash-safe fashion. If
964 * fsync() fails, we can't be sure whether the changes are on disk or
965 * not. For now, we handle that by panicking;
966 * StartupReplicationSlots() will try to straighten it out after
967 * restart.
968 */
970 fsync_fname(tmppath, true);
973 }
974 else
975 {
976 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
977
978 SpinLockAcquire(&slot->mutex);
979 slot->active_pid = 0;
980 SpinLockRelease(&slot->mutex);
981
982 /* wake up anyone waiting on this slot */
984
985 ereport(fail_softly ? WARNING : ERROR,
987 errmsg("could not rename file \"%s\" to \"%s\": %m",
988 path, tmppath)));
989 }
990
991 /*
992 * The slot is definitely gone. Lock out concurrent scans of the array
993 * long enough to kill it. It's OK to clear the active PID here without
994 * grabbing the mutex because nobody else can be scanning the array here,
995 * and nobody can be attached to this slot and thus access it without
996 * scanning the array.
997 *
998 * Also wake up processes waiting for it.
999 */
1000 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1001 slot->active_pid = 0;
1002 slot->in_use = false;
1003 LWLockRelease(ReplicationSlotControlLock);
1005
1006 /*
1007 * Slot is dead and doesn't prevent resource removal anymore, recompute
1008 * limits.
1009 */
1012
1013 /*
1014 * If removing the directory fails, the worst thing that will happen is
1015 * that the user won't be able to create a new slot with the same name
1016 * until the next server restart. We warn about it, but that's all.
1017 */
1018 if (!rmtree(tmppath, true))
1020 (errmsg("could not remove directory \"%s\"", tmppath)));
1021
1022 /*
1023 * Drop the statistics entry for the replication slot. Do this while
1024 * holding ReplicationSlotAllocationLock so that we don't drop a
1025 * statistics entry for another slot with the same name just created in
1026 * another session.
1027 */
1028 if (SlotIsLogical(slot))
1030
1031 /*
1032 * We release this at the very end, so that nobody starts trying to create
1033 * a slot while we're still cleaning up the detritus of the old one.
1034 */
1035 LWLockRelease(ReplicationSlotAllocationLock);
1036}
#define WARNING
Definition: elog.h:36
void pgstat_drop_replslot(ReplicationSlot *slot)
@ RS_PERSISTENT
Definition: slot.h:38

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, ConditionVariableBroadcast(), ReplicationSlot::data, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), ERROR, fsync_fname(), ReplicationSlot::in_use, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotPersistentData::persistency, PG_REPLSLOT_DIR, pgstat_drop_replslot(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), rmtree(), RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, sprintf, START_CRIT_SECTION, and WARNING.

Referenced by ReplicationSlotCleanup(), and ReplicationSlotDropAcquired().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 239 of file slot.c.

240{
242}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:248

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 528 of file slot.c.

529{
530 ReplicationSlot *slot;
531 bool found;
532
534
535 /*
536 * Ensure that the slot cannot be dropped while we copy the name. Don't
537 * need the spinlock as the name of an existing slot cannot change.
538 */
539 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
540 found = slot->in_use;
541 if (slot->in_use)
543 LWLockRelease(ReplicationSlotControlLock);
544
545 return found;
546}
Definition: type.h:96

References ReplicationSlot::data, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 686 of file slot.c.

687{
689 char *slotname = NULL; /* keep compiler quiet */
690 bool is_logical = false; /* keep compiler quiet */
691 TimestampTz now = 0;
692
693 Assert(slot != NULL && slot->active_pid != 0);
694
695 if (am_walsender)
696 {
697 slotname = pstrdup(NameStr(slot->data.name));
698 is_logical = SlotIsLogical(slot);
699 }
700
701 if (slot->data.persistency == RS_EPHEMERAL)
702 {
703 /*
704 * Delete the slot. There is no !PANIC case where this is allowed to
705 * fail, all that may happen is an incomplete cleanup of the on-disk
706 * data.
707 */
709 }
710
711 /*
712 * If slot needed to temporarily restrain both data and catalog xmin to
713 * create the catalog snapshot, remove that temporary constraint.
714 * Snapshots can only be exported while the initial snapshot is still
715 * acquired.
716 */
717 if (!TransactionIdIsValid(slot->data.xmin) &&
719 {
720 SpinLockAcquire(&slot->mutex);
722 SpinLockRelease(&slot->mutex);
724 }
725
726 /*
727 * Set the time since the slot has become inactive. We get the current
728 * time beforehand to avoid system call while holding the spinlock.
729 */
731
732 if (slot->data.persistency == RS_PERSISTENT)
733 {
734 /*
735 * Mark persistent slot inactive. We're not freeing it, just
736 * disconnecting, but wake up others that may be waiting for it.
737 */
738 SpinLockAcquire(&slot->mutex);
739 slot->active_pid = 0;
741 SpinLockRelease(&slot->mutex);
743 }
744 else
746
747 MyReplicationSlot = NULL;
748
749 /* might not have been set when we've been a plain slot */
750 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
751 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
753 LWLockRelease(ProcArrayLock);
754
755 if (am_walsender)
756 {
758 is_logical
759 ? errmsg("released logical replication slot \"%s\"",
760 slotname)
761 : errmsg("released physical replication slot \"%s\"",
762 slotname));
763
764 pfree(slotname);
765 }
766}
@ RS_EPHEMERAL
Definition: slot.h:39
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
uint8 statusFlags
Definition: proc.h:243
int pgxactoff
Definition: proc.h:185
uint8 * statusFlags
Definition: proc.h:387
TransactionId xmin
Definition: slot.h:89

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg(), GetCurrentTimestamp(), InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1452 of file slot.c.

1453{
1455
1456 Assert(slot != NULL);
1458
1459 /*
1460 * The replication slot mechanism is used to prevent removal of required
1461 * WAL. As there is no interlock between this routine and checkpoints, WAL
1462 * segments could concurrently be removed when a now stale return value of
1463 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1464 * this happens we'll just retry.
1465 */
1466 while (true)
1467 {
1468 XLogSegNo segno;
1469 XLogRecPtr restart_lsn;
1470
1471 /*
1472 * For logical slots log a standby snapshot and start logical decoding
1473 * at exactly that position. That allows the slot to start up more
1474 * quickly. But on a standby we cannot do WAL writes, so just use the
1475 * replay pointer; effectively, an attempt to create a logical slot on
1476 * standby will cause it to wait for an xl_running_xact record to be
1477 * logged independently on the primary, so that a snapshot can be
1478 * built using the record.
1479 *
1480 * None of this is needed (or indeed helpful) for physical slots as
1481 * they'll start replay at the last logged checkpoint anyway. Instead
1482 * return the location of the last redo LSN. While that slightly
1483 * increases the chance that we have to retry, it's where a base
1484 * backup has to start replay at.
1485 */
1486 if (SlotIsPhysical(slot))
1487 restart_lsn = GetRedoRecPtr();
1488 else if (RecoveryInProgress())
1489 restart_lsn = GetXLogReplayRecPtr(NULL);
1490 else
1491 restart_lsn = GetXLogInsertRecPtr();
1492
1493 SpinLockAcquire(&slot->mutex);
1494 slot->data.restart_lsn = restart_lsn;
1495 SpinLockRelease(&slot->mutex);
1496
1497 /* prevent WAL removal as fast as possible */
1499
1500 /*
1501 * If all required WAL is still there, great, otherwise retry. The
1502 * slot should prevent further removal of WAL, unless there's a
1503 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1504 * the new restart_lsn above, so normally we should never need to loop
1505 * more than twice.
1506 */
1508 if (XLogGetLastRemovedSegno() < segno)
1509 break;
1510 }
1511
1512 if (!RecoveryInProgress() && SlotIsLogical(slot))
1513 {
1514 XLogRecPtr flushptr;
1515
1516 /* make sure we have enough information to start */
1517 flushptr = LogStandbySnapshot();
1518
1519 /* and make sure it's fsynced to disk */
1520 XLogFlush(flushptr);
1521 }
1522}
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1281
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3764
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6483
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9470
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2790
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert(), ReplicationSlot::data, GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), InvalidXLogRecPtr, LogStandbySnapshot(), ReplicationSlot::mutex, MyReplicationSlot, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, wal_segment_size, XLByteToSeg, XLogFlush(), and XLogGetLastRemovedSegno().

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1205 of file slot.c.

1206{
1208 int i;
1209
1210 if (max_replication_slots <= 0)
1211 return InvalidXLogRecPtr;
1212
1213 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1214
1215 for (i = 0; i < max_replication_slots; i++)
1216 {
1217 ReplicationSlot *s;
1218 XLogRecPtr restart_lsn;
1219 bool invalidated;
1220
1222
1223 /* cannot change while ReplicationSlotCtlLock is held */
1224 if (!s->in_use)
1225 continue;
1226
1227 /* we're only interested in logical slots */
1228 if (!SlotIsLogical(s))
1229 continue;
1230
1231 /* read once, it's ok if it increases while we're checking */
1233 restart_lsn = s->data.restart_lsn;
1234 invalidated = s->data.invalidated != RS_INVAL_NONE;
1236
1237 /* invalidated slots need not apply */
1238 if (invalidated)
1239 continue;
1240
1241 if (restart_lsn == InvalidXLogRecPtr)
1242 continue;
1243
1244 if (result == InvalidXLogRecPtr ||
1245 restart_lsn < result)
1246 result = restart_lsn;
1247 }
1248
1249 LWLockRelease(ReplicationSlotControlLock);
1250
1251 return result;
1252}

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1156 of file slot.c.

1157{
1158 int i;
1159 XLogRecPtr min_required = InvalidXLogRecPtr;
1160
1161 Assert(ReplicationSlotCtl != NULL);
1162
1163 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1164 for (i = 0; i < max_replication_slots; i++)
1165 {
1167 XLogRecPtr restart_lsn;
1168 bool invalidated;
1169
1170 if (!s->in_use)
1171 continue;
1172
1174 restart_lsn = s->data.restart_lsn;
1175 invalidated = s->data.invalidated != RS_INVAL_NONE;
1177
1178 /* invalidated slots need not apply */
1179 if (invalidated)
1180 continue;
1181
1182 if (restart_lsn != InvalidXLogRecPtr &&
1183 (min_required == InvalidXLogRecPtr ||
1184 restart_lsn < min_required))
1185 min_required = restart_lsn;
1186 }
1187 LWLockRelease(ReplicationSlotControlLock);
1188
1190}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2676

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, and XLogSetReplicationSlotMinimumLSN().

Referenced by copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1100 of file slot.c.

1101{
1102 int i;
1104 TransactionId agg_catalog_xmin = InvalidTransactionId;
1105
1106 Assert(ReplicationSlotCtl != NULL);
1107
1108 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1109
1110 for (i = 0; i < max_replication_slots; i++)
1111 {
1113 TransactionId effective_xmin;
1114 TransactionId effective_catalog_xmin;
1115 bool invalidated;
1116
1117 if (!s->in_use)
1118 continue;
1119
1121 effective_xmin = s->effective_xmin;
1122 effective_catalog_xmin = s->effective_catalog_xmin;
1123 invalidated = s->data.invalidated != RS_INVAL_NONE;
1125
1126 /* invalidated slots need not apply */
1127 if (invalidated)
1128 continue;
1129
1130 /* check the data xmin */
1131 if (TransactionIdIsValid(effective_xmin) &&
1132 (!TransactionIdIsValid(agg_xmin) ||
1133 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1134 agg_xmin = effective_xmin;
1135
1136 /* check the catalog xmin */
1137 if (TransactionIdIsValid(effective_catalog_xmin) &&
1138 (!TransactionIdIsValid(agg_catalog_xmin) ||
1139 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1140 agg_catalog_xmin = effective_catalog_xmin;
1141 }
1142
1143 LWLockRelease(ReplicationSlotControlLock);
1144
1145 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1146}
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3943
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int *  nslots,
int *  nactive 
)

Definition at line 1263 of file slot.c.

1264{
1265 int i;
1266
1267 *nslots = *nactive = 0;
1268
1269 if (max_replication_slots <= 0)
1270 return false;
1271
1272 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1273 for (i = 0; i < max_replication_slots; i++)
1274 {
1275 ReplicationSlot *s;
1276
1278
1279 /* cannot change while ReplicationSlotCtlLock is held */
1280 if (!s->in_use)
1281 continue;
1282
1283 /* only logical slots are database specific, skip */
1284 if (!SlotIsLogical(s))
1285 continue;
1286
1287 /* not our database, skip */
1288 if (s->data.database != dboid)
1289 continue;
1290
1291 /* NB: intentionally counting invalidated slots */
1292
1293 /* count slots with spinlock held */
1295 (*nslots)++;
1296 if (s->active_pid != 0)
1297 (*nactive)++;
1299 }
1300 LWLockRelease(ReplicationSlotControlLock);
1301
1302 if (*nslots > 0)
1303 return true;
1304 return false;
1305}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1321 of file slot.c.

1322{
1323 int i;
1324
1325 if (max_replication_slots <= 0)
1326 return;
1327
1328restart:
1329 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1330 for (i = 0; i < max_replication_slots; i++)
1331 {
1332 ReplicationSlot *s;
1333 char *slotname;
1334 int active_pid;
1335
1337
1338 /* cannot change while ReplicationSlotCtlLock is held */
1339 if (!s->in_use)
1340 continue;
1341
1342 /* only logical slots are database specific, skip */
1343 if (!SlotIsLogical(s))
1344 continue;
1345
1346 /* not our database, skip */
1347 if (s->data.database != dboid)
1348 continue;
1349
1350 /* NB: intentionally including invalidated slots */
1351
1352 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1354 /* can't change while ReplicationSlotControlLock is held */
1355 slotname = NameStr(s->data.name);
1356 active_pid = s->active_pid;
1357 if (active_pid == 0)
1358 {
1360 s->active_pid = MyProcPid;
1361 }
1363
1364 /*
1365 * Even though we hold an exclusive lock on the database object a
1366 * logical slot for that DB can still be active, e.g. if it's
1367 * concurrently being dropped by a backend connected to another DB.
1368 *
1369 * That's fairly unlikely in practice, so we'll just bail out.
1370 *
1371 * The slot sync worker holds a shared lock on the database before
1372 * operating on synced logical slots to avoid conflict with the drop
1373 * happening here. The persistent synced slots are thus safe but there
1374 * is a possibility that the slot sync worker has created a temporary
1375 * slot (which stays active even on release) and we are trying to drop
1376 * that here. In practice, the chances of hitting this scenario are
1377 * less as during slot synchronization, the temporary slot is
1378 * immediately converted to persistent and thus is safe due to the
1379 * shared lock taken on the database. So, we'll just bail out in such
1380 * a case.
1381 *
1382 * XXX: We can consider shutting down the slot sync worker before
1383 * trying to drop synced temporary slots here.
1384 */
1385 if (active_pid)
1386 ereport(ERROR,
1387 (errcode(ERRCODE_OBJECT_IN_USE),
1388 errmsg("replication slot \"%s\" is active for PID %d",
1389 slotname, active_pid)));
1390
1391 /*
1392 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1393 * holding ReplicationSlotControlLock over filesystem operations,
1394 * release ReplicationSlotControlLock and use
1395 * ReplicationSlotDropAcquired.
1396 *
1397 * As that means the set of slots could change, restart scan from the
1398 * beginning each time we release the lock.
1399 */
1400 LWLockRelease(ReplicationSlotControlLock);
1402 goto restart;
1403 }
1404 LWLockRelease(ReplicationSlotControlLock);
1405}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg(), ERROR, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotShmemExit()

static void ReplicationSlotShmemExit ( int  code,
Datum  arg 
)
static

Definition at line 248 of file slot.c.

249{
250 /* Make sure active replication slots are released */
251 if (MyReplicationSlot != NULL)
253
254 /* Also cleanup all the temporary slots. */
256}
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775

References MyReplicationSlot, ReplicationSlotCleanup(), and ReplicationSlotRelease().

Referenced by ReplicationSlotInitialize().

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )

Definition at line 204 of file slot.c.

205{
206 bool found;
207
208 if (max_replication_slots == 0)
209 return;
210
212 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
213 &found);
214
215 if (!found)
216 {
217 int i;
218
219 /* First time through, so initialize */
221
222 for (i = 0; i < max_replication_slots; i++)
223 {
225
226 /* everything else is zeroed by the memset above */
227 SpinLockInit(&slot->mutex);
231 }
232 }
233}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:718
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:191
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:186
#define SpinLockInit(lock)
Definition: spin.h:57
LWLock io_in_progress_lock
Definition: slot.h:188

References ReplicationSlot::active_cv, ConditionVariableInit(), i, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), LWTRANCHE_REPLICATION_SLOT_IO, max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )

Definition at line 186 of file slot.c.

187{
188 Size size = 0;
189
190 if (max_replication_slots == 0)
191 return size;
192
193 size = offsetof(ReplicationSlotCtlData, replication_slots);
194 size = add_size(size,
196
197 return size;
198}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char *  name,
int  elevel 
)

Definition at line 267 of file slot.c.

268{
269 const char *cp;
270
271 if (strlen(name) == 0)
272 {
273 ereport(elevel,
274 (errcode(ERRCODE_INVALID_NAME),
275 errmsg("replication slot name \"%s\" is too short",
276 name)));
277 return false;
278 }
279
280 if (strlen(name) >= NAMEDATALEN)
281 {
282 ereport(elevel,
283 (errcode(ERRCODE_NAME_TOO_LONG),
284 errmsg("replication slot name \"%s\" is too long",
285 name)));
286 return false;
287 }
288
289 for (cp = name; *cp; cp++)
290 {
291 if (!((*cp >= 'a' && *cp <= 'z')
292 || (*cp >= '0' && *cp <= '9')
293 || (*cp == '_')))
294 {
295 ereport(elevel,
296 (errcode(ERRCODE_INVALID_NAME),
297 errmsg("replication slot name \"%s\" contains invalid character",
298 name),
299 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
300 return false;
301 }
302 }
303 return true;
304}
#define NAMEDATALEN

References ereport, errcode(), errhint(), errmsg(), name, and NAMEDATALEN.

Referenced by check_primary_slot_name(), parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReportSlotInvalidation()

static void ReportSlotInvalidation ( ReplicationSlotInvalidationCause  cause,
bool  terminating,
int  pid,
NameData  slotname,
XLogRecPtr  restart_lsn,
XLogRecPtr  oldestLSN,
TransactionId  snapshotConflictHorizon,
long  slot_idle_seconds 
)
static

Definition at line 1528 of file slot.c.

1536{
1537 StringInfoData err_detail;
1538 StringInfoData err_hint;
1539
1540 initStringInfo(&err_detail);
1541 initStringInfo(&err_hint);
1542
1543 switch (cause)
1544 {
1546 {
1547 unsigned long long ex = oldestLSN - restart_lsn;
1548
1549 appendStringInfo(&err_detail,
1550 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1551 "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1552 ex),
1553 LSN_FORMAT_ARGS(restart_lsn),
1554 ex);
1555 /* translator: %s is a GUC variable name */
1556 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1557 "max_slot_wal_keep_size");
1558 break;
1559 }
1560 case RS_INVAL_HORIZON:
1561 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1562 snapshotConflictHorizon);
1563 break;
1564
1565 case RS_INVAL_WAL_LEVEL:
1566 appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1567 break;
1568
1570 {
1571 int minutes = slot_idle_seconds / SECS_PER_MINUTE;
1572 int secs = slot_idle_seconds % SECS_PER_MINUTE;
1573
1574 /* translator: %s is a GUC variable name */
1575 appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
1576 minutes, secs, "idle_replication_slot_timeout",
1578 /* translator: %s is a GUC variable name */
1579 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1580 "idle_replication_slot_timeout");
1581 break;
1582 }
1583 case RS_INVAL_NONE:
1585 }
1586
1587 ereport(LOG,
1588 terminating ?
1589 errmsg("terminating process %d to release replication slot \"%s\"",
1590 pid, NameStr(slotname)) :
1591 errmsg("invalidating obsolete replication slot \"%s\"",
1592 NameStr(slotname)),
1593 errdetail_internal("%s", err_detail.data),
1594 err_hint.len ? errhint("%s", err_hint.data) : 0);
1595
1596 pfree(err_detail.data);
1597 pfree(err_hint.data);
1598}
#define ngettext(s, p, n)
Definition: c.h:1152
#define pg_unreachable()
Definition: c.h:332
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
#define _(x)
Definition: elog.c:90
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References _, appendStringInfo(), appendStringInfoString(), StringInfoData::data, ereport, errdetail_internal(), errhint(), errmsg(), idle_replication_slot_timeout_mins, initStringInfo(), StringInfoData::len, LOG, LSN_FORMAT_ARGS, NameStr, ngettext, pfree(), pg_unreachable, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_INVAL_WAL_REMOVED, and SECS_PER_MINUTE.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ RestoreSlotFromDisk()

static void RestoreSlotFromDisk ( const char *  name)
static

Definition at line 2366 of file slot.c.

2367{
2369 int i;
2370 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2371 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2372 int fd;
2373 bool restored = false;
2374 int readBytes;
2375 pg_crc32c checksum;
2376 TimestampTz now = 0;
2377
2378 /* no need to lock here, no concurrent access allowed yet */
2379
2380 /* delete temp file if it exists */
2381 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2382 sprintf(path, "%s/state.tmp", slotdir);
2383 if (unlink(path) < 0 && errno != ENOENT)
2384 ereport(PANIC,
2386 errmsg("could not remove file \"%s\": %m", path)));
2387
2388 sprintf(path, "%s/state", slotdir);
2389
2390 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2391
2392 /* on some operating systems fsyncing a file requires O_RDWR */
2393 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2394
2395 /*
2396 * We do not need to handle this as we are rename()ing the directory into
2397 * place only after we fsync()ed the state file.
2398 */
2399 if (fd < 0)
2400 ereport(PANIC,
2402 errmsg("could not open file \"%s\": %m", path)));
2403
2404 /*
2405 * Sync state file before we're reading from it. We might have crashed
2406 * while it wasn't synced yet and we shouldn't continue on that basis.
2407 */
2408 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2409 if (pg_fsync(fd) != 0)
2410 ereport(PANIC,
2412 errmsg("could not fsync file \"%s\": %m",
2413 path)));
2415
2416 /* Also sync the parent directory */
2418 fsync_fname(slotdir, true);
2420
2421 /* read part of statefile that's guaranteed to be version independent */
2422 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2423 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2425 if (readBytes != ReplicationSlotOnDiskConstantSize)
2426 {
2427 if (readBytes < 0)
2428 ereport(PANIC,
2430 errmsg("could not read file \"%s\": %m", path)));
2431 else
2432 ereport(PANIC,
2434 errmsg("could not read file \"%s\": read %d of %zu",
2435 path, readBytes,
2437 }
2438
2439 /* verify magic */
2440 if (cp.magic != SLOT_MAGIC)
2441 ereport(PANIC,
2443 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2444 path, cp.magic, SLOT_MAGIC)));
2445
2446 /* verify version */
2447 if (cp.version != SLOT_VERSION)
2448 ereport(PANIC,
2450 errmsg("replication slot file \"%s\" has unsupported version %u",
2451 path, cp.version)));
2452
2453 /* boundary check on length */
2455 ereport(PANIC,
2457 errmsg("replication slot file \"%s\" has corrupted length %u",
2458 path, cp.length)));
2459
2460 /* Now that we know the size, read the entire file */
2461 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2462 readBytes = read(fd,
2463 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2464 cp.length);
2466 if (readBytes != cp.length)
2467 {
2468 if (readBytes < 0)
2469 ereport(PANIC,
2471 errmsg("could not read file \"%s\": %m", path)));
2472 else
2473 ereport(PANIC,
2475 errmsg("could not read file \"%s\": read %d of %zu",
2476 path, readBytes, (Size) cp.length)));
2477 }
2478
2479 if (CloseTransientFile(fd) != 0)
2480 ereport(PANIC,
2482 errmsg("could not close file \"%s\": %m", path)));
2483
2484 /* now verify the CRC */
2485 INIT_CRC32C(checksum);
2486 COMP_CRC32C(checksum,
2489 FIN_CRC32C(checksum);
2490
2491 if (!EQ_CRC32C(checksum, cp.checksum))
2492 ereport(PANIC,
2493 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2494 path, checksum, cp.checksum)));
2495
2496 /*
2497 * If we crashed with an ephemeral slot active, don't restore but delete
2498 * it.
2499 */
2501 {
2502 if (!rmtree(slotdir, true))
2503 {
2505 (errmsg("could not remove directory \"%s\"",
2506 slotdir)));
2507 }
2509 return;
2510 }
2511
2512 /*
2513 * Verify that requirements for the specific slot type are met. That's
2514 * important because if these aren't met we're not guaranteed to retain
2515 * all the necessary resources for the slot.
2516 *
2517 * NB: We have to do so *after* the above checks for ephemeral slots,
2518 * because otherwise a slot that shouldn't exist anymore could prevent
2519 * restarts.
2520 *
2521 * NB: Changing the requirements here also requires adapting
2522 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2523 */
2524 if (cp.slotdata.database != InvalidOid)
2525 {
2527 ereport(FATAL,
2528 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2529 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2530 NameStr(cp.slotdata.name)),
2531 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2532
2533 /*
2534 * In standby mode, the hot standby must be enabled. This check is
2535 * necessary to ensure logical slots are invalidated when they become
2536 * incompatible due to insufficient wal_level. Otherwise, if the
2537 * primary reduces wal_level < logical while hot standby is disabled,
2538 * logical slots would remain valid even after promotion.
2539 */
2541 ereport(FATAL,
2542 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2543 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2544 NameStr(cp.slotdata.name)),
2545 errhint("Change \"hot_standby\" to be \"on\".")));
2546 }
2547 else if (wal_level < WAL_LEVEL_REPLICA)
2548 ereport(FATAL,
2549 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2550 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2551 NameStr(cp.slotdata.name)),
2552 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2553
2554 /* nothing can be active yet, don't lock anything */
2555 for (i = 0; i < max_replication_slots; i++)
2556 {
2557 ReplicationSlot *slot;
2558
2560
2561 if (slot->in_use)
2562 continue;
2563
2564 /* restore the entire set of persistent data */
2565 memcpy(&slot->data, &cp.slotdata,
2567
2568 /* initialize in memory state */
2569 slot->effective_xmin = cp.slotdata.xmin;
2572
2577
2578 slot->in_use = true;
2579 slot->active_pid = 0;
2580
2581 /*
2582 * Set the time since the slot has become inactive after loading the
2583 * slot from the disk into memory. Whoever acquires the slot i.e.
2584 * makes the slot active will reset it. Use the same inactive_since
2585 * time for all the slots.
2586 */
2587 if (now == 0)
2589
2591
2592 restored = true;
2593 break;
2594 }
2595
2596 if (!restored)
2597 ereport(FATAL,
2598 (errmsg("too many replication slots active before shutdown"),
2599 errhint("Increase \"max_replication_slots\" and try again.")));
2600}
#define PG_BINARY
Definition: c.h:1244
#define FATAL
Definition: elog.h:41
#define PANIC
Definition: elog.h:42
int CloseTransientFile(int fd)
Definition: fd.c:2831
int pg_fsync(int fd)
Definition: fd.c:385
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:134
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:131
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:137
#define SLOT_VERSION
Definition: slot.c:141
#define SLOT_MAGIC
Definition: slot.c:140
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:128
uint32 version
Definition: slot.c:74
ReplicationSlotPersistentData slotdata
Definition: slot.c:82
pg_crc32c checksum
Definition: slot.c:71
TransactionId catalog_xmin
Definition: slot.h:97
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
bool EnableHotStandby
Definition: xlog.c:121
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76
bool StandbyMode
Definition: xlogrecovery.c:148

References ReplicationSlot::active_pid, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, EnableHotStandby, END_CRIT_SECTION, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errhint(), errmsg(), FATAL, fd(), FIN_CRC32C, fsync_fname(), GetCurrentTimestamp(), i, ReplicationSlot::in_use, INIT_CRC32C, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlotOnDisk::length, ReplicationSlotOnDisk::magic, max_replication_slots, MAXPGPATH, name, ReplicationSlotPersistentData::name, NameStr, now(), OpenTransientFile(), PANIC, ReplicationSlotPersistentData::persistency, PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotSetInactiveSince(), rmtree(), RS_PERSISTENT, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, sprintf, StandbyMode, START_CRIT_SECTION, ReplicationSlotOnDisk::version, wal_level, WAL_LEVEL_LOGICAL, WAL_LEVEL_REPLICA, WARNING, and ReplicationSlotPersistentData::xmin.

Referenced by StartupReplicationSlots().

◆ SaveSlotToPath()

static void SaveSlotToPath ( ReplicationSlot slot,
const char *  dir,
int  elevel 
)
static

Definition at line 2211 of file slot.c.

2212{
2213 char tmppath[MAXPGPATH];
2214 char path[MAXPGPATH];
2215 int fd;
2217 bool was_dirty;
2218
2219 /* first check whether there's something to write out */
2220 SpinLockAcquire(&slot->mutex);
2221 was_dirty = slot->dirty;
2222 slot->just_dirtied = false;
2223 SpinLockRelease(&slot->mutex);
2224
2225 /* and don't do anything if there's nothing to write */
2226 if (!was_dirty)
2227 return;
2228
2230
2231 /* silence valgrind :( */
2232 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2233
2234 sprintf(tmppath, "%s/state.tmp", dir);
2235 sprintf(path, "%s/state", dir);
2236
2237 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2238 if (fd < 0)
2239 {
2240 /*
2241 * If not an ERROR, then release the lock before returning. In case
2242 * of an ERROR, the error recovery path automatically releases the
2243 * lock, but no harm in explicitly releasing even in that case. Note
2244 * that LWLockRelease() could affect errno.
2245 */
2246 int save_errno = errno;
2247
2249 errno = save_errno;
2250 ereport(elevel,
2252 errmsg("could not create file \"%s\": %m",
2253 tmppath)));
2254 return;
2255 }
2256
2257 cp.magic = SLOT_MAGIC;
2259 cp.version = SLOT_VERSION;
2261
2262 SpinLockAcquire(&slot->mutex);
2263
2264 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2265
2266 SpinLockRelease(&slot->mutex);
2267
2271 FIN_CRC32C(cp.checksum);
2272
2273 errno = 0;
2274 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2275 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2276 {
2277 int save_errno = errno;
2278
2282
2283 /* if write didn't set errno, assume problem is no disk space */
2284 errno = save_errno ? save_errno : ENOSPC;
2285 ereport(elevel,
2287 errmsg("could not write to file \"%s\": %m",
2288 tmppath)));
2289 return;
2290 }
2292
2293 /* fsync the temporary file */
2294 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2295 if (pg_fsync(fd) != 0)
2296 {
2297 int save_errno = errno;
2298
2302 errno = save_errno;
2303 ereport(elevel,
2305 errmsg("could not fsync file \"%s\": %m",
2306 tmppath)));
2307 return;
2308 }
2310
2311 if (CloseTransientFile(fd) != 0)
2312 {
2313 int save_errno = errno;
2314
2316 errno = save_errno;
2317 ereport(elevel,
2319 errmsg("could not close file \"%s\": %m",
2320 tmppath)));
2321 return;
2322 }
2323
2324 /* rename to permanent file, fsync file and directory */
2325 if (rename(tmppath, path) != 0)
2326 {
2327 int save_errno = errno;
2328
2330 errno = save_errno;
2331 ereport(elevel,
2333 errmsg("could not rename file \"%s\" to \"%s\": %m",
2334 tmppath, path)));
2335 return;
2336 }
2337
2338 /*
2339 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2340 */
2342
2343 fsync_fname(path, false);
2344 fsync_fname(dir, true);
2346
2348
2349 /*
2350 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2351 * already and remember the confirmed_flush LSN value.
2352 */
2353 SpinLockAcquire(&slot->mutex);
2354 if (!slot->just_dirtied)
2355 slot->dirty = false;
2357 SpinLockRelease(&slot->mutex);
2358
2360}
#define write(a, b, c)
Definition: win32.h:14

References ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, ReplicationSlot::io_in_progress_lock, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlotOnDisk::length, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlotOnDisk::magic, MAXPGPATH, ReplicationSlot::mutex, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, SpinLockAcquire, SpinLockRelease, sprintf, START_CRIT_SECTION, ReplicationSlotOnDisk::version, and write.

Referenced by CheckPointReplicationSlots(), CreateSlotOnDisk(), and ReplicationSlotSave().

◆ SearchNamedReplicationSlot()

ReplicationSlot * SearchNamedReplicationSlot ( const char *  name,
bool  need_lock 
)

Definition at line 479 of file slot.c.

480{
481 int i;
482 ReplicationSlot *slot = NULL;
483
484 if (need_lock)
485 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486
487 for (i = 0; i < max_replication_slots; i++)
488 {
490
491 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
492 {
493 slot = s;
494 break;
495 }
496 }
497
498 if (need_lock)
499 LWLockRelease(ReplicationSlotControlLock);
500
501 return slot;
502}

References ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, name, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by get_replslot_index(), pg_ls_replslotdir(), pgstat_reset_replslot(), ReadReplicationSlot(), ReplicationSlotAcquire(), StandbySlotsHaveCaughtup(), synchronize_one_slot(), and validate_sync_standby_slots().

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char *  slot_name)

Definition at line 2770 of file slot.c.

2771{
2772 const char *standby_slot_name;
2773
2774 /* Return false if there is no value in synchronized_standby_slots */
2776 return false;
2777
2778 /*
2779 * XXX: We are not expecting this list to be long so a linear search
2780 * shouldn't hurt but if that turns out not to be true then we can cache
2781 * this information for each WalSender as well.
2782 */
2783 standby_slot_name = synchronized_standby_slots_config->slot_names;
2784 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2785 {
2786 if (strcmp(standby_slot_name, slot_name) == 0)
2787 return true;
2788
2789 standby_slot_name += strlen(standby_slot_name) + 1;
2790 }
2791
2792 return false;
2793}

References i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 2803 of file slot.c.

2804{
2805 const char *name;
2806 int caught_up_slot_num = 0;
2807 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2808
2809 /*
2810 * Don't need to wait for the standbys to catch up if there is no value in
2811 * synchronized_standby_slots.
2812 */
2814 return true;
2815
2816 /*
2817 * Don't need to wait for the standbys to catch up if we are on a standby
2818 * server, since we do not support syncing slots to cascading standbys.
2819 */
2820 if (RecoveryInProgress())
2821 return true;
2822
2823 /*
2824 * Don't need to wait for the standbys to catch up if they are already
2825 * beyond the specified WAL location.
2826 */
2828 ss_oldest_flush_lsn >= wait_for_lsn)
2829 return true;
2830
2831 /*
2832 * To prevent concurrent slot dropping and creation while filtering the
2833 * slots, take the ReplicationSlotControlLock outside of the loop.
2834 */
2835 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2836
2838 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2839 {
2840 XLogRecPtr restart_lsn;
2841 bool invalidated;
2842 bool inactive;
2843 ReplicationSlot *slot;
2844
2845 slot = SearchNamedReplicationSlot(name, false);
2846
2847 /*
2848 * If a slot name provided in synchronized_standby_slots does not
2849 * exist, report a message and exit the loop.
2850 *
2851 * Though validate_sync_standby_slots (the GUC check_hook) tries to
2852 * avoid this, it can nonetheless happen because the user can specify
2853 * a nonexistent slot name before server startup. That function cannot
2854 * validate such a slot during startup, as ReplicationSlotCtl is not
2855 * initialized by then. Also, the user might have dropped one slot.
2856 */
2857 if (!slot)
2858 {
2859 ereport(elevel,
2860 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2861 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2862 name, "synchronized_standby_slots"),
2863 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2864 name),
2865 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2866 name, "synchronized_standby_slots"));
2867 break;
2868 }
2869
2870 /* Same as above: if a slot is not physical, exit the loop. */
2871 if (SlotIsLogical(slot))
2872 {
2873 ereport(elevel,
2874 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2875 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2876 name, "synchronized_standby_slots"),
2877 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2878 name),
2879 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2880 name, "synchronized_standby_slots"));
2881 break;
2882 }
2883
2884 SpinLockAcquire(&slot->mutex);
2885 restart_lsn = slot->data.restart_lsn;
2886 invalidated = slot->data.invalidated != RS_INVAL_NONE;
2887 inactive = slot->active_pid == 0;
2888 SpinLockRelease(&slot->mutex);
2889
2890 if (invalidated)
2891 {
2892 /* Specified physical slot has been invalidated */
2893 ereport(elevel,
2894 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2895 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2896 name, "synchronized_standby_slots"),
2897 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2898 name),
2899 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2900 name, "synchronized_standby_slots"));
2901 break;
2902 }
2903
2904 if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2905 {
2906 /* Log a message if no active_pid for this physical slot */
2907 if (inactive)
2908 ereport(elevel,
2909 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2910 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2911 name, "synchronized_standby_slots"),
2912 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2913 name),
2914 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2915 name, "synchronized_standby_slots"));
2916
2917 /* Continue if the current slot hasn't caught up. */
2918 break;
2919 }
2920
2921 Assert(restart_lsn >= wait_for_lsn);
2922
2923 if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2924 min_restart_lsn > restart_lsn)
2925 min_restart_lsn = restart_lsn;
2926
2927 caught_up_slot_num++;
2928
2929 name += strlen(name) + 1;
2930 }
2931
2932 LWLockRelease(ReplicationSlotControlLock);
2933
2934 /*
2935 * Return false if not all the standbys have caught up to the specified
2936 * WAL location.
2937 */
2938 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2939 return false;
2940
2941 /* The ss_oldest_flush_lsn must not retreat. */
2943 min_restart_lsn >= ss_oldest_flush_lsn);
2944
2945 ss_oldest_flush_lsn = min_restart_lsn;
2946
2947 return true;
2948}

References ReplicationSlot::active_pid, Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg(), i, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsInvalid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2089 of file slot.c.

2090{
2091 DIR *replication_dir;
2092 struct dirent *replication_de;
2093
2094 elog(DEBUG1, "starting up replication slots");
2095
2096 /* restore all slots by iterating over all on-disk entries */
2097 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2098 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2099 {
2100 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2101 PGFileType de_type;
2102
2103 if (strcmp(replication_de->d_name, ".") == 0 ||
2104 strcmp(replication_de->d_name, "..") == 0)
2105 continue;
2106
2107 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2108 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2109
2110 /* we're only creating directories here, skip if it's not our's */
2111 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2112 continue;
2113
2114 /* we crashed while a slot was being setup or deleted, clean up */
2115 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2116 {
2117 if (!rmtree(path, true))
2118 {
2120 (errmsg("could not remove directory \"%s\"",
2121 path)));
2122 continue;
2123 }
2125 continue;
2126 }
2127
2128 /* looks like a slot in a normal state, restore */
2129 RestoreSlotFromDisk(replication_de->d_name);
2130 }
2131 FreeDir(replication_dir);
2132
2133 /* currently no slots exist, we're done. */
2134 if (max_replication_slots <= 0)
2135 return;
2136
2137 /* Now that we have recovered all the data, compute replication xmin */
2140}
int FreeDir(DIR *dir)
Definition: fd.c:2983
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2931
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:526
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define snprintf
Definition: port.h:239
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2366
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errmsg(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(SlotInvalidationCauses = =(RS_INVAL_MAX_CAUSES+1),
"array length mismatch"   
)

◆ validate_sync_standby_slots()

static bool validate_sync_standby_slots ( char *  rawname,
List **  elemlist 
)
static

Definition at line 2646 of file slot.c.

2647{
2648 bool ok;
2649
2650 /* Verify syntax and parse string into a list of identifiers */
2651 ok = SplitIdentifierString(rawname, ',', elemlist);
2652
2653 if (!ok)
2654 {
2655 GUC_check_errdetail("List syntax is invalid.");
2656 }
2657 else if (MyProc)
2658 {
2659 /*
2660 * Check that each specified slot exist and is physical.
2661 *
2662 * Because we need an LWLock, we cannot do this on processes without a
2663 * PGPROC, so we skip it there; but see comments in
2664 * StandbySlotsHaveCaughtup() as to why that's not a problem.
2665 */
2666 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2667
2668 foreach_ptr(char, name, *elemlist)
2669 {
2670 ReplicationSlot *slot;
2671
2672 slot = SearchNamedReplicationSlot(name, false);
2673
2674 if (!slot)
2675 {
2676 GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2677 name);
2678 ok = false;
2679 break;
2680 }
2681
2682 if (!SlotIsPhysical(slot))
2683 {
2684 GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2685 name);
2686 ok = false;
2687 break;
2688 }
2689 }
2690
2691 LWLockRelease(ReplicationSlotControlLock);
2692 }
2693
2694 return ok;
2695}
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3525

References foreach_ptr, GUC_check_errdetail, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, name, SearchNamedReplicationSlot(), SlotIsPhysical, and SplitIdentifierString().

Referenced by check_synchronized_standby_slots().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 2957 of file slot.c.

2958{
2959 /*
2960 * Don't need to wait for the standby to catch up if the current acquired
2961 * slot is not a logical failover slot, or there is no value in
2962 * synchronized_standby_slots.
2963 */
2965 return;
2966
2968
2969 for (;;)
2970 {
2972
2974 {
2975 ConfigReloadPending = false;
2977 }
2978
2979 /* Exit if done waiting for every slot. */
2980 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2981 break;
2982
2983 /*
2984 * Wait for the slots in the synchronized_standby_slots to catch up,
2985 * but use a timeout (1s) so we can also check if the
2986 * synchronized_standby_slots has been changed.
2987 */
2989 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2990 }
2991
2993}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2803
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:110

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_mins

int idle_replication_slot_timeout_mins = 0

◆ max_replication_slots

◆ MyReplicationSlot

ReplicationSlot* MyReplicationSlot = NULL

Definition at line 147 of file slot.c.

Referenced by binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), LogicalReplicationSlotHasPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_local_synced_slot(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ SlotInvalidationCauses

const SlotInvalidationCauseMap SlotInvalidationCauses[]
static
Initial value:
= {
{RS_INVAL_NONE, "none"},
{RS_INVAL_WAL_REMOVED, "wal_removed"},
{RS_INVAL_HORIZON, "rows_removed"},
{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
}

Definition at line 112 of file slot.c.

Referenced by GetSlotInvalidationCause(), and GetSlotInvalidationCauseName().

◆ ss_oldest_flush_lsn

XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
static

Definition at line 172 of file slot.c.

Referenced by assign_synchronized_standby_slots(), and StandbySlotsHaveCaughtup().

◆ synchronized_standby_slots

char* synchronized_standby_slots

Definition at line 163 of file slot.c.

◆ synchronized_standby_slots_config