PostgreSQL Source Code git master
slot.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
#include "utils/varlena.h"
Include dependency graph for slot.c:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotOnDisk
 
struct  SyncStandbySlotsConfigData
 
struct  SlotInvalidationCauseMap
 

Macros

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)
 
#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)
 
#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
 
#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
#define SLOT_MAGIC   0x1051CA1 /* format identifier */
 
#define SLOT_VERSION   5 /* version for new files */
 

Typedefs

typedef struct ReplicationSlotOnDisk ReplicationSlotOnDisk
 
typedef struct SlotInvalidationCauseMap SlotInvalidationCauseMap
 

Functions

 StaticAssertDecl (lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
 
static void ReplicationSlotShmemExit (int code, Datum arg)
 
static bool IsSlotForConflictCheck (const char *name)
 
static void ReplicationSlotDropPtr (ReplicationSlot *slot)
 
static void RestoreSlotFromDisk (const char *name)
 
static void CreateSlotOnDisk (ReplicationSlot *slot)
 
static void SaveSlotToPath (ReplicationSlot *slot, const char *dir, int elevel)
 
Size ReplicationSlotsShmemSize (void)
 
void ReplicationSlotsShmemInit (void)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
void CheckSlotRequirements (void)
 
void CheckSlotPermissions (void)
 
void ReplicationSlotReserveWal (void)
 
static void ReportSlotInvalidation (ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
 
static bool CanInvalidateIdleSlot (ReplicationSlot *s)
 
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
 
static bool InvalidatePossiblyObsoleteSlot (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void StartupReplicationSlots (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const char * GetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
static bool validate_sync_standby_slots (char *rawname, List **elemlist)
 
bool check_synchronized_standby_slots (char **newval, void **extra, GucSource source)
 
void assign_synchronized_standby_slots (const char *newval, void *extra)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

static const SlotInvalidationCauseMap SlotInvalidationCauses []
 
ReplicationSlotCtlDataReplicationSlotCtl = NULL
 
ReplicationSlotMyReplicationSlot = NULL
 
int max_replication_slots = 10
 
int idle_replication_slot_timeout_secs = 0
 
char * synchronized_standby_slots
 
static SyncStandbySlotsConfigDatasynchronized_standby_slots_config
 
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ ReplicationSlotOnDiskChecksummedSize

#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize

Definition at line 135 of file slot.c.

◆ ReplicationSlotOnDiskConstantSize

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)

Definition at line 129 of file slot.c.

◆ ReplicationSlotOnDiskNotChecksummedSize

#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)

Definition at line 132 of file slot.c.

◆ ReplicationSlotOnDiskV2Size

#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize

Definition at line 138 of file slot.c.

◆ SLOT_MAGIC

#define SLOT_MAGIC   0x1051CA1 /* format identifier */

Definition at line 141 of file slot.c.

◆ SLOT_VERSION

#define SLOT_VERSION   5 /* version for new files */

Definition at line 142 of file slot.c.

Typedef Documentation

◆ ReplicationSlotOnDisk

◆ SlotInvalidationCauseMap

Function Documentation

◆ assign_synchronized_standby_slots()

void assign_synchronized_standby_slots ( const char *  newval,
void *  extra 
)

Definition at line 2862 of file slot.c.

2863{
2864 /*
2865 * The standby slots may have changed, so we must recompute the oldest
2866 * LSN.
2867 */
2869
2871}
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:173
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:167
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References InvalidXLogRecPtr, ss_oldest_flush_lsn, and synchronized_standby_slots_config.

◆ CanInvalidateIdleSlot()

static bool CanInvalidateIdleSlot ( ReplicationSlot s)
inlinestatic

Definition at line 1733 of file slot.c.

1734{
1737 s->inactive_since > 0 &&
1738 !(RecoveryInProgress() && s->data.synced));
1739}
int idle_replication_slot_timeout_secs
Definition: slot.c:158
ReplicationSlotPersistentData data
Definition: slot.h:210
TimestampTz inactive_since
Definition: slot.h:242
bool RecoveryInProgress(void)
Definition: xlog.c:6404
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References ReplicationSlot::data, idle_replication_slot_timeout_secs, ReplicationSlot::inactive_since, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, ReplicationSlotPersistentData::synced, and XLogRecPtrIsValid.

Referenced by DetermineSlotInvalidationCause().

◆ check_synchronized_standby_slots()

bool check_synchronized_standby_slots ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 2806 of file slot.c.

2807{
2808 char *rawname;
2809 char *ptr;
2810 List *elemlist;
2811 int size;
2812 bool ok;
2814
2815 if ((*newval)[0] == '\0')
2816 return true;
2817
2818 /* Need a modifiable copy of the GUC string */
2819 rawname = pstrdup(*newval);
2820
2821 /* Now verify if the specified slots exist and have correct type */
2822 ok = validate_sync_standby_slots(rawname, &elemlist);
2823
2824 if (!ok || elemlist == NIL)
2825 {
2826 pfree(rawname);
2827 list_free(elemlist);
2828 return ok;
2829 }
2830
2831 /* Compute the size required for the SyncStandbySlotsConfigData struct */
2832 size = offsetof(SyncStandbySlotsConfigData, slot_names);
2833 foreach_ptr(char, slot_name, elemlist)
2834 size += strlen(slot_name) + 1;
2835
2836 /* GUC extra value must be guc_malloc'd, not palloc'd */
2837 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2838 if (!config)
2839 return false;
2840
2841 /* Transform the data into SyncStandbySlotsConfigData */
2842 config->nslotnames = list_length(elemlist);
2843
2844 ptr = config->slot_names;
2845 foreach_ptr(char, slot_name, elemlist)
2846 {
2847 strcpy(ptr, slot_name);
2848 ptr += strlen(slot_name) + 1;
2849 }
2850
2851 *extra = config;
2852
2853 pfree(rawname);
2854 list_free(elemlist);
2855 return true;
2856}
#define LOG
Definition: elog.h:31
void * guc_malloc(int elevel, size_t size)
Definition: guc.c:636
#define newval
void list_free(List *list)
Definition: list.c:1546
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition: slot.c:2772
Definition: pg_list.h:54
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:101

References foreach_ptr, guc_malloc(), list_free(), list_length(), LOG, newval, NIL, SyncStandbySlotsConfigData::nslotnames, pfree(), pstrdup(), SyncStandbySlotsConfigData::slot_names, and validate_sync_standby_slots().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2128 of file slot.c.

2129{
2130 int i;
2131 bool last_saved_restart_lsn_updated = false;
2132
2133 elog(DEBUG1, "performing replication slot checkpoint");
2134
2135 /*
2136 * Prevent any slot from being created/dropped while we're active. As we
2137 * explicitly do *not* want to block iterating over replication_slots or
2138 * acquiring a slot we cannot take the control lock - but that's OK,
2139 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2140 * enough to guarantee that nobody can change the in_use bits on us.
2141 *
2142 * Additionally, acquiring the Allocation lock is necessary to serialize
2143 * the slot flush process with concurrent slot WAL reservation. This
2144 * ensures that the WAL position being reserved is either flushed to disk
2145 * or is beyond or equal to the redo pointer of the current checkpoint
2146 * (See ReplicationSlotReserveWal for details).
2147 */
2148 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2149
2150 for (i = 0; i < max_replication_slots; i++)
2151 {
2153 char path[MAXPGPATH];
2154
2155 if (!s->in_use)
2156 continue;
2157
2158 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2159 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2160
2161 /*
2162 * Slot's data is not flushed each time the confirmed_flush LSN is
2163 * updated as that could lead to frequent writes. However, we decide
2164 * to force a flush of all logical slot's data at the time of shutdown
2165 * if the confirmed_flush LSN is changed since we last flushed it to
2166 * disk. This helps in avoiding an unnecessary retreat of the
2167 * confirmed_flush LSN after restart.
2168 */
2169 if (is_shutdown && SlotIsLogical(s))
2170 {
2172
2173 if (s->data.invalidated == RS_INVAL_NONE &&
2175 {
2176 s->just_dirtied = true;
2177 s->dirty = true;
2178 }
2180 }
2181
2182 /*
2183 * Track if we're going to update slot's last_saved_restart_lsn. We
2184 * need this to know if we need to recompute the required LSN.
2185 */
2187 last_saved_restart_lsn_updated = true;
2188
2189 SaveSlotToPath(s, path, LOG);
2190 }
2191 LWLockRelease(ReplicationSlotAllocationLock);
2192
2193 /*
2194 * Recompute the required LSN if SaveSlotToPath() updated
2195 * last_saved_restart_lsn for any slot.
2196 */
2197 if (last_saved_restart_lsn_updated)
2199}
#define NameStr(name)
Definition: c.h:765
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
#define MAXPGPATH
#define sprintf
Definition: port.h:262
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2328
int max_replication_slots
Definition: slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:145
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1234
#define PG_REPLSLOT_DIR
Definition: slot.h:21
@ RS_INVAL_NONE
Definition: slot.h:60
#define SlotIsLogical(slot)
Definition: slot.h:285
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
ReplicationSlot replication_slots[1]
Definition: slot.h:296
XLogRecPtr confirmed_flush
Definition: slot.h:136
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:128
slock_t mutex
Definition: slot.h:183
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:235
bool in_use
Definition: slot.h:186
bool just_dirtied
Definition: slot.h:192
XLogRecPtr last_saved_restart_lsn
Definition: slot.h:268
bool dirty
Definition: slot.h:193

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire, SpinLockRelease, and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1555 of file slot.c.

1556{
1558 ereport(ERROR,
1559 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1560 errmsg("permission denied to use replication slots"),
1561 errdetail("Only roles with the %s attribute may use replication slots.",
1562 "REPLICATION")));
1563}
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
Oid GetUserId(void)
Definition: miscinit.c:469
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:688

References ereport, errcode(), errdetail(), errmsg(), ERROR, GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( void  )

Definition at line 1533 of file slot.c.

1534{
1535 /*
1536 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1537 * needs the same check.
1538 */
1539
1540 if (max_replication_slots == 0)
1541 ereport(ERROR,
1542 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1543 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1544
1546 ereport(ERROR,
1547 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1548 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1549}
int wal_level
Definition: xlog.c:133
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75

References ereport, errcode(), errmsg(), ERROR, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ CreateSlotOnDisk()

static void CreateSlotOnDisk ( ReplicationSlot slot)
static

Definition at line 2267 of file slot.c.

2268{
2269 char tmppath[MAXPGPATH];
2270 char path[MAXPGPATH];
2271 struct stat st;
2272
2273 /*
2274 * No need to take out the io_in_progress_lock, nobody else can see this
2275 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2276 * takes out the lock, if we'd take the lock here, we'd deadlock.
2277 */
2278
2279 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2280 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2281
2282 /*
2283 * It's just barely possible that some previous effort to create or drop a
2284 * slot with this name left a temp directory lying around. If that seems
2285 * to be the case, try to remove it. If the rmtree() fails, we'll error
2286 * out at the MakePGDirectory() below, so we don't bother checking
2287 * success.
2288 */
2289 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2290 rmtree(tmppath, true);
2291
2292 /* Create and fsync the temporary slot directory. */
2293 if (MakePGDirectory(tmppath) < 0)
2294 ereport(ERROR,
2296 errmsg("could not create directory \"%s\": %m",
2297 tmppath)));
2298 fsync_fname(tmppath, true);
2299
2300 /* Write the actual state file. */
2301 slot->dirty = true; /* signal that we really need to write */
2302 SaveSlotToPath(slot, tmppath, ERROR);
2303
2304 /* Rename the directory into place. */
2305 if (rename(tmppath, path) != 0)
2306 ereport(ERROR,
2308 errmsg("could not rename file \"%s\" to \"%s\": %m",
2309 tmppath, path)));
2310
2311 /*
2312 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2313 * would persist after an OS crash or not - so, force a restart. The
2314 * restart would try to fsync this again till it works.
2315 */
2317
2318 fsync_fname(path, true);
2320
2322}
int errcode_for_file_access(void)
Definition: elog.c:886
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3959
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:753
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
#define stat
Definition: win32_port.h:274
#define S_ISDIR(m)
Definition: win32_port.h:315

References ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), ERROR, fsync_fname(), MakePGDirectory(), MAXPGPATH, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, rmtree(), S_ISDIR, SaveSlotToPath(), sprintf, stat::st_mode, START_CRIT_SECTION, and stat.

Referenced by ReplicationSlotCreate().

◆ DetermineSlotInvalidationCause()

static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
TimestampTz inactive_since,
TimestampTz  now 
)
static

Definition at line 1749 of file slot.c.

1753{
1754 Assert(possible_causes != RS_INVAL_NONE);
1755
1756 if (possible_causes & RS_INVAL_WAL_REMOVED)
1757 {
1758 XLogRecPtr restart_lsn = s->data.restart_lsn;
1759
1760 if (XLogRecPtrIsValid(restart_lsn) &&
1761 restart_lsn < oldestLSN)
1762 return RS_INVAL_WAL_REMOVED;
1763 }
1764
1765 if (possible_causes & RS_INVAL_HORIZON)
1766 {
1767 /* invalid DB oid signals a shared relation */
1768 if (SlotIsLogical(s) &&
1769 (dboid == InvalidOid || dboid == s->data.database))
1770 {
1771 TransactionId effective_xmin = s->effective_xmin;
1772 TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1773
1774 if (TransactionIdIsValid(effective_xmin) &&
1775 TransactionIdPrecedesOrEquals(effective_xmin,
1776 snapshotConflictHorizon))
1777 return RS_INVAL_HORIZON;
1778 else if (TransactionIdIsValid(catalog_effective_xmin) &&
1779 TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1780 snapshotConflictHorizon))
1781 return RS_INVAL_HORIZON;
1782 }
1783 }
1784
1785 if (possible_causes & RS_INVAL_WAL_LEVEL)
1786 {
1787 if (SlotIsLogical(s))
1788 return RS_INVAL_WAL_LEVEL;
1789 }
1790
1791 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1792 {
1793 Assert(now > 0);
1794
1795 if (CanInvalidateIdleSlot(s))
1796 {
1797 /*
1798 * Simulate the invalidation due to idle_timeout to test the
1799 * timeout behavior promptly, without waiting for it to trigger
1800 * naturally.
1801 */
1802#ifdef USE_INJECTION_POINTS
1803 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1804 {
1805 *inactive_since = 0; /* since the beginning of time */
1806 return RS_INVAL_IDLE_TIMEOUT;
1807 }
1808#endif
1809
1810 /*
1811 * Check if the slot needs to be invalidated due to
1812 * idle_replication_slot_timeout GUC.
1813 */
1816 {
1817 *inactive_since = s->inactive_since;
1818 return RS_INVAL_IDLE_TIMEOUT;
1819 }
1820 }
1821 }
1822
1823 return RS_INVAL_NONE;
1824}
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition: timestamp.c:1795
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
uint32 TransactionId
Definition: c.h:671
Assert(PointerIsAligned(start, uint64))
#define IS_INJECTION_POINT_ATTACHED(name)
#define InvalidOid
Definition: postgres_ext.h:37
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition: slot.c:1733
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:68
@ RS_INVAL_HORIZON
Definition: slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:66
TransactionId effective_catalog_xmin
Definition: slot.h:207
TransactionId effective_xmin
Definition: slot.h:206
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdIsValid(xid)
Definition: transam.h:41
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), CanInvalidateIdleSlot(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, idle_replication_slot_timeout_secs, ReplicationSlot::inactive_since, InvalidOid, IS_INJECTION_POINT_ATTACHED, now(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_INVAL_WAL_REMOVED, SlotIsLogical, TimestampDifferenceExceedsSeconds(), TransactionIdIsValid, TransactionIdPrecedesOrEquals(), and XLogRecPtrIsValid.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char *  cause_name)

Definition at line 2733 of file slot.c.

2734{
2735 Assert(cause_name);
2736
2737 /* Search lookup table for the cause having this name */
2738 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2739 {
2740 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2742 }
2743
2744 Assert(false);
2745 return RS_INVAL_NONE; /* to keep compiler quiet */
2746}
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:113
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:72
ReplicationSlotInvalidationCause cause
Definition: slot.c:109

References Assert(), SlotInvalidationCauseMap::cause, i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by synchronize_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2753 of file slot.c.

2754{
2755 /* Search lookup table for the name of this cause */
2756 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2757 {
2758 if (SlotInvalidationCauses[i].cause == cause)
2760 }
2761
2762 Assert(false);
2763 return "none"; /* to keep compiler quiet */
2764}
const char * cause_name
Definition: slot.c:110

References Assert(), SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 2068 of file slot.c.

2071{
2072 XLogRecPtr oldestLSN;
2073 bool invalidated = false;
2074
2075 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2076 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2077 Assert(possible_causes != RS_INVAL_NONE);
2078
2079 if (max_replication_slots == 0)
2080 return invalidated;
2081
2082 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2083
2084restart:
2085 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2086 for (int i = 0; i < max_replication_slots; i++)
2087 {
2089
2090 if (!s->in_use)
2091 continue;
2092
2093 /* Prevent invalidation of logical slots during binary upgrade */
2095 continue;
2096
2097 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2098 snapshotConflictHorizon,
2099 &invalidated))
2100 {
2101 /* if the lock was released, start from scratch */
2102 goto restart;
2103 }
2104 }
2105 LWLockRelease(ReplicationSlotControlLock);
2106
2107 /*
2108 * If any slots have been invalidated, recalculate the resource limits.
2109 */
2110 if (invalidated)
2111 {
2114 }
2115
2116 return invalidated;
2117}
bool IsBinaryUpgrade
Definition: globals.c:121
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1178
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1840
int wal_segment_size
Definition: xlog.c:145
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)

References Assert(), i, ReplicationSlot::in_use, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ InvalidatePossiblyObsoleteSlot()

static bool InvalidatePossiblyObsoleteSlot ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
bool *  invalidated 
)
static

Definition at line 1840 of file slot.c.

1845{
1846 int last_signaled_pid = 0;
1847 bool released_lock = false;
1848 TimestampTz inactive_since = 0;
1849
1850 for (;;)
1851 {
1852 XLogRecPtr restart_lsn;
1853 NameData slotname;
1854 int active_pid = 0;
1856 TimestampTz now = 0;
1857 long slot_idle_secs = 0;
1858
1859 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1860
1861 if (!s->in_use)
1862 {
1863 if (released_lock)
1864 LWLockRelease(ReplicationSlotControlLock);
1865 break;
1866 }
1867
1868 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1869 {
1870 /*
1871 * Assign the current time here to avoid system call overhead
1872 * while holding the spinlock in subsequent code.
1873 */
1875 }
1876
1877 /*
1878 * Check if the slot needs to be invalidated. If it needs to be
1879 * invalidated, and is not currently acquired, acquire it and mark it
1880 * as having been invalidated. We do this with the spinlock held to
1881 * avoid race conditions -- for example the restart_lsn could move
1882 * forward, or the slot could be dropped.
1883 */
1885
1886 restart_lsn = s->data.restart_lsn;
1887
1888 /* we do nothing if the slot is already invalid */
1889 if (s->data.invalidated == RS_INVAL_NONE)
1890 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1891 s, oldestLSN,
1892 dboid,
1893 snapshotConflictHorizon,
1894 &inactive_since,
1895 now);
1896
1897 /* if there's no invalidation, we're done */
1898 if (invalidation_cause == RS_INVAL_NONE)
1899 {
1901 if (released_lock)
1902 LWLockRelease(ReplicationSlotControlLock);
1903 break;
1904 }
1905
1906 slotname = s->data.name;
1907 active_pid = s->active_pid;
1908
1909 /*
1910 * If the slot can be acquired, do so and mark it invalidated
1911 * immediately. Otherwise we'll signal the owning process, below, and
1912 * retry.
1913 *
1914 * Note: Unlike other slot attributes, slot's inactive_since can't be
1915 * changed until the acquired slot is released or the owning process
1916 * is terminated. So, the inactive slot can only be invalidated
1917 * immediately without being terminated.
1918 */
1919 if (active_pid == 0)
1920 {
1922 s->active_pid = MyProcPid;
1923 s->data.invalidated = invalidation_cause;
1924
1925 /*
1926 * XXX: We should consider not overwriting restart_lsn and instead
1927 * just rely on .invalidated.
1928 */
1929 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1930 {
1933 }
1934
1935 /* Let caller know */
1936 *invalidated = true;
1937 }
1938
1940
1941 /*
1942 * Calculate the idle time duration of the slot if slot is marked
1943 * invalidated with RS_INVAL_IDLE_TIMEOUT.
1944 */
1945 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1946 {
1947 int slot_idle_usecs;
1948
1949 TimestampDifference(inactive_since, now, &slot_idle_secs,
1950 &slot_idle_usecs);
1951 }
1952
1953 if (active_pid != 0)
1954 {
1955 /*
1956 * Prepare the sleep on the slot's condition variable before
1957 * releasing the lock, to close a possible race condition if the
1958 * slot is released before the sleep below.
1959 */
1961
1962 LWLockRelease(ReplicationSlotControlLock);
1963 released_lock = true;
1964
1965 /*
1966 * Signal to terminate the process that owns the slot, if we
1967 * haven't already signalled it. (Avoidance of repeated
1968 * signalling is the only reason for there to be a loop in this
1969 * routine; otherwise we could rely on caller's restart loop.)
1970 *
1971 * There is the race condition that other process may own the slot
1972 * after its current owner process is terminated and before this
1973 * process owns it. To handle that, we signal only if the PID of
1974 * the owning process has changed from the previous time. (This
1975 * logic assumes that the same PID is not reused very quickly.)
1976 */
1977 if (last_signaled_pid != active_pid)
1978 {
1979 ReportSlotInvalidation(invalidation_cause, true, active_pid,
1980 slotname, restart_lsn,
1981 oldestLSN, snapshotConflictHorizon,
1982 slot_idle_secs);
1983
1984 if (MyBackendType == B_STARTUP)
1985 (void) SendProcSignal(active_pid,
1988 else
1989 (void) kill(active_pid, SIGTERM);
1990
1991 last_signaled_pid = active_pid;
1992 }
1993
1994 /* Wait until the slot is released. */
1996 WAIT_EVENT_REPLICATION_SLOT_DROP);
1997
1998 /*
1999 * Re-acquire lock and start over; we expect to invalidate the
2000 * slot next time (unless another process acquires the slot in the
2001 * meantime).
2002 *
2003 * Note: It is possible for a slot to advance its restart_lsn or
2004 * xmin values sufficiently between when we release the mutex and
2005 * when we recheck, moving from a conflicting state to a non
2006 * conflicting state. This is intentional and safe: if the slot
2007 * has caught up while we're busy here, the resources we were
2008 * concerned about (WAL segments or tuples) have not yet been
2009 * removed, and there's no reason to invalidate the slot.
2010 */
2011 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2012 continue;
2013 }
2014 else
2015 {
2016 /*
2017 * We hold the slot now and have already invalidated it; flush it
2018 * to ensure that state persists.
2019 *
2020 * Don't want to hold ReplicationSlotControlLock across file
2021 * system operations, so release it now but be sure to tell caller
2022 * to restart from scratch.
2023 */
2024 LWLockRelease(ReplicationSlotControlLock);
2025 released_lock = true;
2026
2027 /* Make sure the invalidated state persists across server restart */
2031
2032 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2033 slotname, restart_lsn,
2034 oldestLSN, snapshotConflictHorizon,
2035 slot_idle_secs);
2036
2037 /* done with this slot for now */
2038 break;
2039 }
2040 }
2041
2042 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2043
2044 return released_lock;
2045}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1721
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int MyProcPid
Definition: globals.c:47
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1977
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2021
@ B_STARTUP
Definition: miscadmin.h:365
BackendType MyBackendType
Definition: miscinit.c:64
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1139
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition: slot.c:1749
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition: slot.c:1651
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
void ReplicationSlotSave(void)
Definition: slot.c:1121
void ReplicationSlotRelease(void)
Definition: slot.c:764
ReplicationSlotInvalidationCause
Definition: slot.h:59
pid_t active_pid
Definition: slot.h:189
ConditionVariable active_cv
Definition: slot.h:216
Definition: c.h:760
#define kill(pid, sig)
Definition: win32_port.h:490

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), B_STARTUP, ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DetermineSlotInvalidationCause(), GetCurrentTimestamp(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, kill, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockHeldByMeInMode(), LWLockRelease(), ReplicationSlot::mutex, MyBackendType, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, now(), PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReportSlotInvalidation(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by InvalidateObsoleteReplicationSlots().

◆ IsSlotForConflictCheck()

static bool IsSlotForConflictCheck ( const char *  name)
static

Definition at line 361 of file slot.c.

362{
363 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364}
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28
const char * name

References CONFLICT_DETECTION_SLOT, and name.

Referenced by ReplicationSlotAcquire(), and ReplicationSlotValidateNameInternal().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char *  name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 626 of file slot.c.

627{
629 int active_pid;
630
631 Assert(name != NULL);
632
633retry:
634 Assert(MyReplicationSlot == NULL);
635
636 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
637
638 /* Check if the slot exits with the given name. */
640 if (s == NULL || !s->in_use)
641 {
642 LWLockRelease(ReplicationSlotControlLock);
643
645 (errcode(ERRCODE_UNDEFINED_OBJECT),
646 errmsg("replication slot \"%s\" does not exist",
647 name)));
648 }
649
650 /*
651 * Do not allow users to acquire the reserved slot. This scenario may
652 * occur if the launcher that owns the slot has terminated unexpectedly
653 * due to an error, and a backend process attempts to reuse the slot.
654 */
657 errcode(ERRCODE_UNDEFINED_OBJECT),
658 errmsg("cannot acquire replication slot \"%s\"", name),
659 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
660
661 /*
662 * This is the slot we want; check if it's active under some other
663 * process. In single user mode, we don't need this check.
664 */
666 {
667 /*
668 * Get ready to sleep on the slot in case it is active. (We may end
669 * up not sleeping, but we don't want to do this while holding the
670 * spinlock.)
671 */
672 if (!nowait)
674
675 /*
676 * It is important to reset the inactive_since under spinlock here to
677 * avoid race conditions with slot invalidation. See comments related
678 * to inactive_since in InvalidatePossiblyObsoleteSlot.
679 */
681 if (s->active_pid == 0)
683 active_pid = s->active_pid;
686 }
687 else
688 {
689 s->active_pid = active_pid = MyProcPid;
691 }
692 LWLockRelease(ReplicationSlotControlLock);
693
694 /*
695 * If we found the slot but it's already active in another process, we
696 * wait until the owning process signals us that it's been released, or
697 * error out.
698 */
699 if (active_pid != MyProcPid)
700 {
701 if (!nowait)
702 {
703 /* Wait here until we get signaled, and then restart */
705 WAIT_EVENT_REPLICATION_SLOT_DROP);
707 goto retry;
708 }
709
711 (errcode(ERRCODE_OBJECT_IN_USE),
712 errmsg("replication slot \"%s\" is active for PID %d",
713 NameStr(s->data.name), active_pid)));
714 }
715 else if (!nowait)
716 ConditionVariableCancelSleep(); /* no sleep needed after all */
717
718 /* We made this slot active, so it's ours now. */
720
721 /*
722 * We need to check for invalidation after making the slot ours to avoid
723 * the possible race condition with the checkpointer that can otherwise
724 * invalidate the slot immediately after the check.
725 */
726 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
728 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 errmsg("can no longer access replication slot \"%s\"",
730 NameStr(s->data.name)),
731 errdetail("This replication slot has been invalidated due to \"%s\".",
733
734 /* Let everybody know we've modified this slot */
736
737 /*
738 * The call to pgstat_acquire_replslot() protects against stats for a
739 * different slot, from before a restart or such, being present during
740 * pgstat_report_replslot().
741 */
742 if (SlotIsLogical(s))
744
745
746 if (am_walsender)
747 {
750 ? errmsg("acquired logical replication slot \"%s\"",
751 NameStr(s->data.name))
752 : errmsg("acquired physical replication slot \"%s\"",
753 NameStr(s->data.name)));
754 }
755}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
bool IsUnderPostmaster
Definition: globals.c:120
bool IsLogicalLauncher(void)
Definition: launcher.c:1585
void pgstat_acquire_replslot(ReplicationSlot *slot)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:546
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2753
static bool IsSlotForConflictCheck(const char *name)
Definition: slot.c:361
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:303
bool am_walsender
Definition: walsender.c:123
bool log_replication_commands
Definition: walsender.c:133

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg(), ERROR, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_logical_slot_has_caught_up(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char *  name,
const bool *  failover,
const bool *  two_phase 
)

Definition at line 915 of file slot.c.

917{
918 bool update_slot = false;
919
920 Assert(MyReplicationSlot == NULL);
922
923 ReplicationSlotAcquire(name, false, true);
924
927 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
928 errmsg("cannot use %s with a physical replication slot",
929 "ALTER_REPLICATION_SLOT"));
930
931 if (RecoveryInProgress())
932 {
933 /*
934 * Do not allow users to alter the slots which are currently being
935 * synced from the primary to the standby.
936 */
939 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
940 errmsg("cannot alter replication slot \"%s\"", name),
941 errdetail("This replication slot is being synchronized from the primary server."));
942
943 /*
944 * Do not allow users to enable failover on the standby as we do not
945 * support sync to the cascading standby.
946 */
947 if (failover && *failover)
949 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
950 errmsg("cannot enable failover for a replication slot"
951 " on the standby"));
952 }
953
954 if (failover)
955 {
956 /*
957 * Do not allow users to enable failover for temporary slots as we do
958 * not support syncing temporary slots to the standby.
959 */
962 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
963 errmsg("cannot enable failover for a temporary replication slot"));
964
966 {
970
971 update_slot = true;
972 }
973 }
974
976 {
980
981 update_slot = true;
982 }
983
984 if (update_slot)
985 {
988 }
989
991}
static bool two_phase
static bool failover
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:626
@ RS_TEMPORARY
Definition: slot.h:47
#define SlotIsPhysical(slot)
Definition: slot.h:284
ReplicationSlotPersistency persistency
Definition: slot.h:106

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 853 of file slot.c.

854{
855 int i;
856
857 Assert(MyReplicationSlot == NULL);
858
859restart:
860 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
861 for (i = 0; i < max_replication_slots; i++)
862 {
864
865 if (!s->in_use)
866 continue;
867
869 if ((s->active_pid == MyProcPid &&
870 (!synced_only || s->data.synced)))
871 {
874 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
875
877
879 goto restart;
880 }
881 else
883 }
884
885 LWLockRelease(ReplicationSlotControlLock);
886}
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:1014

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RS_TEMPORARY, SpinLockAcquire, SpinLockRelease, and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char *  name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  failover,
bool  synced 
)

Definition at line 384 of file slot.c.

387{
388 ReplicationSlot *slot = NULL;
389 int i;
390
391 Assert(MyReplicationSlot == NULL);
392
393 /*
394 * The logical launcher or pg_upgrade may create or migrate an internal
395 * slot, so using a reserved name is allowed in these cases.
396 */
398 ERROR);
399
400 if (failover)
401 {
402 /*
403 * Do not allow users to create the failover enabled slots on the
404 * standby as we do not support sync to the cascading standby.
405 *
406 * However, failover enabled slots can be created during slot
407 * synchronization because we need to retain the same values as the
408 * remote slot.
409 */
412 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 errmsg("cannot enable failover for a replication slot created on the standby"));
414
415 /*
416 * Do not allow users to create failover enabled temporary slots,
417 * because temporary slots will not be synced to the standby.
418 *
419 * However, failover enabled temporary slots can be created during
420 * slot synchronization. See the comments atop slotsync.c for details.
421 */
422 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
424 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 errmsg("cannot enable failover for a temporary replication slot"));
426 }
427
428 /*
429 * If some other backend ran this code concurrently with us, we'd likely
430 * both allocate the same slot, and that would be bad. We'd also be at
431 * risk of missing a name collision. Also, we don't want to try to create
432 * a new slot while somebody's busy cleaning up an old one, because we
433 * might both be monkeying with the same directory.
434 */
435 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436
437 /*
438 * Check for name collision, and identify an allocatable slot. We need to
439 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 * else can change the in_use flags while we're looking at them.
441 */
442 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 for (i = 0; i < max_replication_slots; i++)
444 {
446
447 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
450 errmsg("replication slot \"%s\" already exists", name)));
451 if (!s->in_use && slot == NULL)
452 slot = s;
453 }
454 LWLockRelease(ReplicationSlotControlLock);
455
456 /* If all slots are in use, we're out of luck. */
457 if (slot == NULL)
459 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 errmsg("all replication slots are in use"),
461 errhint("Free one or increase \"max_replication_slots\".")));
462
463 /*
464 * Since this slot is not in use, nobody should be looking at any part of
465 * it other than the in_use field unless they're trying to allocate it.
466 * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 * be doing that. So it's safe to initialize the slot.
468 */
469 Assert(!slot->in_use);
470 Assert(slot->active_pid == 0);
471
472 /* first initialize persistent data */
473 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 namestrcpy(&slot->data.name, name);
475 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 slot->data.persistency = persistency;
477 slot->data.two_phase = two_phase;
479 slot->data.failover = failover;
480 slot->data.synced = synced;
481
482 /* and then data only present in shared memory */
483 slot->just_dirtied = false;
484 slot->dirty = false;
493 slot->inactive_since = 0;
495
496 /*
497 * Create the slot on disk. We haven't actually marked the slot allocated
498 * yet, so no special cleanup is required if this errors out.
499 */
500 CreateSlotOnDisk(slot);
501
502 /*
503 * We need to briefly prevent any other backend from iterating over the
504 * slots while we flip the in_use flag. We also need to set the active
505 * flag while holding the ControlLock as otherwise a concurrent
506 * ReplicationSlotAcquire() could acquire the slot as well.
507 */
508 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
509
510 slot->in_use = true;
511
512 /* We can now mark the slot active, and that makes it our slot. */
513 SpinLockAcquire(&slot->mutex);
514 Assert(slot->active_pid == 0);
515 slot->active_pid = MyProcPid;
516 SpinLockRelease(&slot->mutex);
517 MyReplicationSlot = slot;
518
519 LWLockRelease(ReplicationSlotControlLock);
520
521 /*
522 * Create statistics entry for the new logical slot. We don't collect any
523 * stats for physical slots, so no need to create an entry for the same.
524 * See ReplicationSlotDropPtr for why we need to do this before releasing
525 * ReplicationSlotAllocationLock.
526 */
527 if (SlotIsLogical(slot))
529
530 /*
531 * Now that the slot has been marked as in_use and active, it's safe to
532 * let somebody else try to allocate a slot.
533 */
534 LWLockRelease(ReplicationSlotAllocationLock);
535
536 /* Let everybody know we've modified this slot */
538}
int errhint(const char *fmt,...)
Definition: elog.c:1330
Oid MyDatabaseId
Definition: globals.c:94
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void pgstat_create_replslot(ReplicationSlot *slot)
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2267
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
@ SS_SKIP_NONE
Definition: slot.h:82
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1797
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:226
XLogRecPtr candidate_restart_valid
Definition: slot.h:227
SlotSyncSkipReason slotsync_skip_reason
Definition: slot.h:281
XLogRecPtr candidate_restart_lsn
Definition: slot.h:228
TransactionId candidate_catalog_xmin
Definition: slot.h:225
#define InvalidTransactionId
Definition: transam.h:31

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, Assert(), ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, failover, ReplicationSlotPersistentData::failover, i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcPid, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire, SpinLockRelease, SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char *  name,
bool  nowait 
)

Definition at line 892 of file slot.c.

893{
894 Assert(MyReplicationSlot == NULL);
895
896 ReplicationSlotAcquire(name, nowait, false);
897
898 /*
899 * Do not allow users to drop the slots which are currently being synced
900 * from the primary to the standby.
901 */
904 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
905 errmsg("cannot drop replication slot \"%s\"", name),
906 errdetail("This replication slot is being synchronized from the primary server."));
907
909}
void ReplicationSlotDropAcquired(void)
Definition: slot.c:997

References Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg(), ERROR, MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )

Definition at line 997 of file slot.c.

998{
1000
1001 Assert(MyReplicationSlot != NULL);
1002
1003 /* slot isn't acquired anymore */
1004 MyReplicationSlot = NULL;
1005
1007}

References Assert(), MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropPtr()

static void ReplicationSlotDropPtr ( ReplicationSlot slot)
static

Definition at line 1014 of file slot.c.

1015{
1016 char path[MAXPGPATH];
1017 char tmppath[MAXPGPATH];
1018
1019 /*
1020 * If some other backend ran this code concurrently with us, we might try
1021 * to delete a slot with a certain name while someone else was trying to
1022 * create a slot with the same name.
1023 */
1024 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1025
1026 /* Generate pathnames. */
1027 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1028 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1029
1030 /*
1031 * Rename the slot directory on disk, so that we'll no longer recognize
1032 * this as a valid slot. Note that if this fails, we've got to mark the
1033 * slot inactive before bailing out. If we're dropping an ephemeral or a
1034 * temporary slot, we better never fail hard as the caller won't expect
1035 * the slot to survive and this might get called during error handling.
1036 */
1037 if (rename(path, tmppath) == 0)
1038 {
1039 /*
1040 * We need to fsync() the directory we just renamed and its parent to
1041 * make sure that our changes are on disk in a crash-safe fashion. If
1042 * fsync() fails, we can't be sure whether the changes are on disk or
1043 * not. For now, we handle that by panicking;
1044 * StartupReplicationSlots() will try to straighten it out after
1045 * restart.
1046 */
1048 fsync_fname(tmppath, true);
1051 }
1052 else
1053 {
1054 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1055
1056 SpinLockAcquire(&slot->mutex);
1057 slot->active_pid = 0;
1058 SpinLockRelease(&slot->mutex);
1059
1060 /* wake up anyone waiting on this slot */
1062
1063 ereport(fail_softly ? WARNING : ERROR,
1065 errmsg("could not rename file \"%s\" to \"%s\": %m",
1066 path, tmppath)));
1067 }
1068
1069 /*
1070 * The slot is definitely gone. Lock out concurrent scans of the array
1071 * long enough to kill it. It's OK to clear the active PID here without
1072 * grabbing the mutex because nobody else can be scanning the array here,
1073 * and nobody can be attached to this slot and thus access it without
1074 * scanning the array.
1075 *
1076 * Also wake up processes waiting for it.
1077 */
1078 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1079 slot->active_pid = 0;
1080 slot->in_use = false;
1081 LWLockRelease(ReplicationSlotControlLock);
1083
1084 /*
1085 * Slot is dead and doesn't prevent resource removal anymore, recompute
1086 * limits.
1087 */
1090
1091 /*
1092 * If removing the directory fails, the worst thing that will happen is
1093 * that the user won't be able to create a new slot with the same name
1094 * until the next server restart. We warn about it, but that's all.
1095 */
1096 if (!rmtree(tmppath, true))
1098 (errmsg("could not remove directory \"%s\"", tmppath)));
1099
1100 /*
1101 * Drop the statistics entry for the replication slot. Do this while
1102 * holding ReplicationSlotAllocationLock so that we don't drop a
1103 * statistics entry for another slot with the same name just created in
1104 * another session.
1105 */
1106 if (SlotIsLogical(slot))
1108
1109 /*
1110 * We release this at the very end, so that nobody starts trying to create
1111 * a slot while we're still cleaning up the detritus of the old one.
1112 */
1113 LWLockRelease(ReplicationSlotAllocationLock);
1114}
#define WARNING
Definition: elog.h:36
void pgstat_drop_replslot(ReplicationSlot *slot)
@ RS_PERSISTENT
Definition: slot.h:45

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, ConditionVariableBroadcast(), ReplicationSlot::data, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), ERROR, fsync_fname(), ReplicationSlot::in_use, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotPersistentData::persistency, PG_REPLSLOT_DIR, pgstat_drop_replslot(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), rmtree(), RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, sprintf, START_CRIT_SECTION, and WARNING.

Referenced by ReplicationSlotCleanup(), and ReplicationSlotDropAcquired().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 241 of file slot.c.

242{
244}
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:250

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 595 of file slot.c.

596{
597 ReplicationSlot *slot;
598 bool found;
599
601
602 /*
603 * Ensure that the slot cannot be dropped while we copy the name. Don't
604 * need the spinlock as the name of an existing slot cannot change.
605 */
606 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607 found = slot->in_use;
608 if (slot->in_use)
610 LWLockRelease(ReplicationSlotControlLock);
611
612 return found;
613}
Definition: type.h:96

References ReplicationSlot::data, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 764 of file slot.c.

765{
767 char *slotname = NULL; /* keep compiler quiet */
768 bool is_logical = false; /* keep compiler quiet */
769 TimestampTz now = 0;
770
771 Assert(slot != NULL && slot->active_pid != 0);
772
773 if (am_walsender)
774 {
775 slotname = pstrdup(NameStr(slot->data.name));
776 is_logical = SlotIsLogical(slot);
777 }
778
779 if (slot->data.persistency == RS_EPHEMERAL)
780 {
781 /*
782 * Delete the slot. There is no !PANIC case where this is allowed to
783 * fail, all that may happen is an incomplete cleanup of the on-disk
784 * data.
785 */
787 }
788
789 /*
790 * If slot needed to temporarily restrain both data and catalog xmin to
791 * create the catalog snapshot, remove that temporary constraint.
792 * Snapshots can only be exported while the initial snapshot is still
793 * acquired.
794 */
795 if (!TransactionIdIsValid(slot->data.xmin) &&
797 {
798 SpinLockAcquire(&slot->mutex);
800 SpinLockRelease(&slot->mutex);
802 }
803
804 /*
805 * Set the time since the slot has become inactive. We get the current
806 * time beforehand to avoid system call while holding the spinlock.
807 */
809
810 if (slot->data.persistency == RS_PERSISTENT)
811 {
812 /*
813 * Mark persistent slot inactive. We're not freeing it, just
814 * disconnecting, but wake up others that may be waiting for it.
815 */
816 SpinLockAcquire(&slot->mutex);
817 slot->active_pid = 0;
819 SpinLockRelease(&slot->mutex);
821 }
822 else
824
825 MyReplicationSlot = NULL;
826
827 /* might not have been set when we've been a plain slot */
828 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
829 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
831 LWLockRelease(ProcArrayLock);
832
833 if (am_walsender)
834 {
836 is_logical
837 ? errmsg("released logical replication slot \"%s\"",
838 slotname)
839 : errmsg("released physical replication slot \"%s\"",
840 slotname));
841
842 pfree(slotname);
843 }
844}
@ RS_EPHEMERAL
Definition: slot.h:46
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
TransactionId xmin
Definition: slot.h:114

References ReplicationSlot::active_cv, ReplicationSlot::active_pid, am_walsender, Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg(), GetCurrentTimestamp(), InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_create_conflict_detection_slot(), binary_upgrade_logical_slot_has_caught_up(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1572 of file slot.c.

1573{
1575 XLogSegNo segno;
1576 XLogRecPtr restart_lsn;
1577
1578 Assert(slot != NULL);
1581
1582 /*
1583 * The replication slot mechanism is used to prevent the removal of
1584 * required WAL.
1585 *
1586 * Acquire an exclusive lock to prevent the checkpoint process from
1587 * concurrently computing the minimum slot LSN (see
1588 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1589 * replication cannot be removed during a checkpoint.
1590 *
1591 * The mechanism is reliable because if WAL reservation occurs first, the
1592 * checkpoint must wait for the restart_lsn update before determining the
1593 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1594 * first, subsequent WAL reservations will select positions at or beyond
1595 * the redo pointer of that checkpoint.
1596 */
1597 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1598
1599 /*
1600 * For logical slots log a standby snapshot and start logical decoding at
1601 * exactly that position. That allows the slot to start up more quickly.
1602 * But on a standby we cannot do WAL writes, so just use the replay
1603 * pointer; effectively, an attempt to create a logical slot on standby
1604 * will cause it to wait for an xl_running_xact record to be logged
1605 * independently on the primary, so that a snapshot can be built using the
1606 * record.
1607 *
1608 * None of this is needed (or indeed helpful) for physical slots as
1609 * they'll start replay at the last logged checkpoint anyway. Instead,
1610 * return the location of the last redo LSN, where a base backup has to
1611 * start replay at.
1612 */
1613 if (SlotIsPhysical(slot))
1614 restart_lsn = GetRedoRecPtr();
1615 else if (RecoveryInProgress())
1616 restart_lsn = GetXLogReplayRecPtr(NULL);
1617 else
1618 restart_lsn = GetXLogInsertRecPtr();
1619
1620 SpinLockAcquire(&slot->mutex);
1621 slot->data.restart_lsn = restart_lsn;
1622 SpinLockRelease(&slot->mutex);
1623
1624 /* prevent WAL removal as fast as possible */
1626
1627 /* Checkpoint shouldn't remove the required WAL. */
1629 if (XLogGetLastRemovedSegno() >= segno)
1630 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1631 NameStr(slot->data.name));
1632
1633 LWLockRelease(ReplicationSlotAllocationLock);
1634
1635 if (!RecoveryInProgress() && SlotIsLogical(slot))
1636 {
1637 XLogRecPtr flushptr;
1638
1639 /* make sure we have enough information to start */
1640 flushptr = LogStandbySnapshot();
1641
1642 /* and make sure it's fsynced to disk */
1643 XLogFlush(flushptr);
1644 }
1645}
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3779
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6507
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9497
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert(), ReplicationSlot::data, elog, ERROR, GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire, SpinLockRelease, wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1304 of file slot.c.

1305{
1307 int i;
1308
1309 if (max_replication_slots <= 0)
1310 return InvalidXLogRecPtr;
1311
1312 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1313
1314 for (i = 0; i < max_replication_slots; i++)
1315 {
1316 ReplicationSlot *s;
1317 XLogRecPtr restart_lsn;
1318 XLogRecPtr last_saved_restart_lsn;
1319 bool invalidated;
1320 ReplicationSlotPersistency persistency;
1321
1323
1324 /* cannot change while ReplicationSlotCtlLock is held */
1325 if (!s->in_use)
1326 continue;
1327
1328 /* we're only interested in logical slots */
1329 if (!SlotIsLogical(s))
1330 continue;
1331
1332 /* read once, it's ok if it increases while we're checking */
1334 persistency = s->data.persistency;
1335 restart_lsn = s->data.restart_lsn;
1336 invalidated = s->data.invalidated != RS_INVAL_NONE;
1337 last_saved_restart_lsn = s->last_saved_restart_lsn;
1339
1340 /* invalidated slots need not apply */
1341 if (invalidated)
1342 continue;
1343
1344 /*
1345 * For persistent slot use last_saved_restart_lsn to compute the
1346 * oldest LSN for removal of WAL segments. The segments between
1347 * last_saved_restart_lsn and restart_lsn might be needed by a
1348 * persistent slot in the case of database crash. Non-persistent
1349 * slots can't survive the database crash, so we don't care about
1350 * last_saved_restart_lsn for them.
1351 */
1352 if (persistency == RS_PERSISTENT)
1353 {
1354 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1355 restart_lsn > last_saved_restart_lsn)
1356 {
1357 restart_lsn = last_saved_restart_lsn;
1358 }
1359 }
1360
1361 if (!XLogRecPtrIsValid(restart_lsn))
1362 continue;
1363
1364 if (!XLogRecPtrIsValid(result) ||
1365 restart_lsn < result)
1366 result = restart_lsn;
1367 }
1368
1369 LWLockRelease(ReplicationSlotControlLock);
1370
1371 return result;
1372}
ReplicationSlotPersistency
Definition: slot.h:44

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1234 of file slot.c.

1235{
1236 int i;
1237 XLogRecPtr min_required = InvalidXLogRecPtr;
1238
1239 Assert(ReplicationSlotCtl != NULL);
1240
1241 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1242 for (i = 0; i < max_replication_slots; i++)
1243 {
1245 XLogRecPtr restart_lsn;
1246 XLogRecPtr last_saved_restart_lsn;
1247 bool invalidated;
1248 ReplicationSlotPersistency persistency;
1249
1250 if (!s->in_use)
1251 continue;
1252
1254 persistency = s->data.persistency;
1255 restart_lsn = s->data.restart_lsn;
1256 invalidated = s->data.invalidated != RS_INVAL_NONE;
1257 last_saved_restart_lsn = s->last_saved_restart_lsn;
1259
1260 /* invalidated slots need not apply */
1261 if (invalidated)
1262 continue;
1263
1264 /*
1265 * For persistent slot use last_saved_restart_lsn to compute the
1266 * oldest LSN for removal of WAL segments. The segments between
1267 * last_saved_restart_lsn and restart_lsn might be needed by a
1268 * persistent slot in the case of database crash. Non-persistent
1269 * slots can't survive the database crash, so we don't care about
1270 * last_saved_restart_lsn for them.
1271 */
1272 if (persistency == RS_PERSISTENT)
1273 {
1274 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1275 restart_lsn > last_saved_restart_lsn)
1276 {
1277 restart_lsn = last_saved_restart_lsn;
1278 }
1279 }
1280
1281 if (XLogRecPtrIsValid(restart_lsn) &&
1282 (!XLogRecPtrIsValid(min_required) ||
1283 restart_lsn < min_required))
1284 min_required = restart_lsn;
1285 }
1286 LWLockRelease(ReplicationSlotControlLock);
1287
1289}
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2669

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire, SpinLockRelease, XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1178 of file slot.c.

1179{
1180 int i;
1182 TransactionId agg_catalog_xmin = InvalidTransactionId;
1183
1184 Assert(ReplicationSlotCtl != NULL);
1185
1186 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1187
1188 for (i = 0; i < max_replication_slots; i++)
1189 {
1191 TransactionId effective_xmin;
1192 TransactionId effective_catalog_xmin;
1193 bool invalidated;
1194
1195 if (!s->in_use)
1196 continue;
1197
1199 effective_xmin = s->effective_xmin;
1200 effective_catalog_xmin = s->effective_catalog_xmin;
1201 invalidated = s->data.invalidated != RS_INVAL_NONE;
1203
1204 /* invalidated slots need not apply */
1205 if (invalidated)
1206 continue;
1207
1208 /* check the data xmin */
1209 if (TransactionIdIsValid(effective_xmin) &&
1210 (!TransactionIdIsValid(agg_xmin) ||
1211 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1212 agg_xmin = effective_xmin;
1213
1214 /* check the catalog xmin */
1215 if (TransactionIdIsValid(effective_catalog_xmin) &&
1216 (!TransactionIdIsValid(agg_catalog_xmin) ||
1217 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1218 agg_catalog_xmin = effective_catalog_xmin;
1219 }
1220
1221 LWLockRelease(ReplicationSlotControlLock);
1222
1223 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1224}
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3903
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int *  nslots,
int *  nactive 
)

Definition at line 1383 of file slot.c.

1384{
1385 int i;
1386
1387 *nslots = *nactive = 0;
1388
1389 if (max_replication_slots <= 0)
1390 return false;
1391
1392 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1393 for (i = 0; i < max_replication_slots; i++)
1394 {
1395 ReplicationSlot *s;
1396
1398
1399 /* cannot change while ReplicationSlotCtlLock is held */
1400 if (!s->in_use)
1401 continue;
1402
1403 /* only logical slots are database specific, skip */
1404 if (!SlotIsLogical(s))
1405 continue;
1406
1407 /* not our database, skip */
1408 if (s->data.database != dboid)
1409 continue;
1410
1411 /* NB: intentionally counting invalidated slots */
1412
1413 /* count slots with spinlock held */
1415 (*nslots)++;
1416 if (s->active_pid != 0)
1417 (*nactive)++;
1419 }
1420 LWLockRelease(ReplicationSlotControlLock);
1421
1422 if (*nslots > 0)
1423 return true;
1424 return false;
1425}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1441 of file slot.c.

1442{
1443 int i;
1444
1445 if (max_replication_slots <= 0)
1446 return;
1447
1448restart:
1449 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1450 for (i = 0; i < max_replication_slots; i++)
1451 {
1452 ReplicationSlot *s;
1453 char *slotname;
1454 int active_pid;
1455
1457
1458 /* cannot change while ReplicationSlotCtlLock is held */
1459 if (!s->in_use)
1460 continue;
1461
1462 /* only logical slots are database specific, skip */
1463 if (!SlotIsLogical(s))
1464 continue;
1465
1466 /* not our database, skip */
1467 if (s->data.database != dboid)
1468 continue;
1469
1470 /* NB: intentionally including invalidated slots */
1471
1472 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1474 /* can't change while ReplicationSlotControlLock is held */
1475 slotname = NameStr(s->data.name);
1476 active_pid = s->active_pid;
1477 if (active_pid == 0)
1478 {
1480 s->active_pid = MyProcPid;
1481 }
1483
1484 /*
1485 * Even though we hold an exclusive lock on the database object a
1486 * logical slot for that DB can still be active, e.g. if it's
1487 * concurrently being dropped by a backend connected to another DB.
1488 *
1489 * That's fairly unlikely in practice, so we'll just bail out.
1490 *
1491 * The slot sync worker holds a shared lock on the database before
1492 * operating on synced logical slots to avoid conflict with the drop
1493 * happening here. The persistent synced slots are thus safe but there
1494 * is a possibility that the slot sync worker has created a temporary
1495 * slot (which stays active even on release) and we are trying to drop
1496 * that here. In practice, the chances of hitting this scenario are
1497 * less as during slot synchronization, the temporary slot is
1498 * immediately converted to persistent and thus is safe due to the
1499 * shared lock taken on the database. So, we'll just bail out in such
1500 * a case.
1501 *
1502 * XXX: We can consider shutting down the slot sync worker before
1503 * trying to drop synced temporary slots here.
1504 */
1505 if (active_pid)
1506 ereport(ERROR,
1507 (errcode(ERRCODE_OBJECT_IN_USE),
1508 errmsg("replication slot \"%s\" is active for PID %d",
1509 slotname, active_pid)));
1510
1511 /*
1512 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1513 * holding ReplicationSlotControlLock over filesystem operations,
1514 * release ReplicationSlotControlLock and use
1515 * ReplicationSlotDropAcquired.
1516 *
1517 * As that means the set of slots could change, restart scan from the
1518 * beginning each time we release the lock.
1519 */
1520 LWLockRelease(ReplicationSlotControlLock);
1522 goto restart;
1523 }
1524 LWLockRelease(ReplicationSlotControlLock);
1525}

References ReplicationSlot::active_pid, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg(), ERROR, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, MyProcPid, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), SlotIsLogical, SpinLockAcquire, and SpinLockRelease.

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotShmemExit()

static void ReplicationSlotShmemExit ( int  code,
Datum  arg 
)
static

Definition at line 250 of file slot.c.

251{
252 /* Make sure active replication slots are released */
253 if (MyReplicationSlot != NULL)
255
256 /* Also cleanup all the temporary slots. */
258}
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:853

References MyReplicationSlot, ReplicationSlotCleanup(), and ReplicationSlotRelease().

Referenced by ReplicationSlotInitialize().

◆ ReplicationSlotsShmemInit()

void ReplicationSlotsShmemInit ( void  )

Definition at line 206 of file slot.c.

207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
231 LWTRANCHE_REPLICATION_SLOT_IO);
233 }
234 }
235}
#define MemSet(start, val, len)
Definition: c.h:1032
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:188
#define SpinLockInit(lock)
Definition: spin.h:57
LWLock io_in_progress_lock
Definition: slot.h:213

References ReplicationSlot::active_cv, ConditionVariableInit(), i, ReplicationSlot::io_in_progress_lock, LWLockInitialize(), max_replication_slots, MemSet, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsShmemSize(), ShmemInitStruct(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ ReplicationSlotsShmemSize()

Size ReplicationSlotsShmemSize ( void  )

Definition at line 188 of file slot.c.

189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
size_t Size
Definition: c.h:624
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_replication_slots, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationSlotsShmemInit().

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char *  name,
bool  allow_reserved_name,
int  elevel 
)

Definition at line 266 of file slot.c.

268{
269 int err_code;
270 char *err_msg = NULL;
271 char *err_hint = NULL;
272
273 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 &err_code, &err_msg, &err_hint))
275 {
276 /*
277 * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 * and errhint(), since the messages from
279 * ReplicationSlotValidateNameInternal() are already translated. This
280 * avoids double translation.
281 */
282 ereport(elevel,
283 errcode(err_code),
284 errmsg_internal("%s", err_msg),
285 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286
287 pfree(err_msg);
288 if (err_hint != NULL)
289 pfree(err_hint);
290 return false;
291 }
292
293 return true;
294}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errhint_internal(const char *fmt,...)
Definition: elog.c:1352
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition: slot.c:311

References ereport, errcode(), errhint_internal(), errmsg_internal(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char *  name,
bool  allow_reserved_name,
int *  err_code,
char **  err_msg,
char **  err_hint 
)

Definition at line 311 of file slot.c.

313{
314 const char *cp;
315
316 if (strlen(name) == 0)
317 {
318 *err_code = ERRCODE_INVALID_NAME;
319 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 *err_hint = NULL;
321 return false;
322 }
323
324 if (strlen(name) >= NAMEDATALEN)
325 {
326 *err_code = ERRCODE_NAME_TOO_LONG;
327 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 *err_hint = NULL;
329 return false;
330 }
331
332 for (cp = name; *cp; cp++)
333 {
334 if (!((*cp >= 'a' && *cp <= 'z')
335 || (*cp >= '0' && *cp <= '9')
336 || (*cp == '_')))
337 {
338 *err_code = ERRCODE_INVALID_NAME;
339 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 return false;
342 }
343 }
344
345 if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 {
347 *err_code = ERRCODE_RESERVED_NAME;
348 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 return false;
352 }
353
354 return true;
355}
#define _(x)
Definition: elog.c:91
#define NAMEDATALEN
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References _, CONFLICT_DETECTION_SLOT, IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ ReportSlotInvalidation()

static void ReportSlotInvalidation ( ReplicationSlotInvalidationCause  cause,
bool  terminating,
int  pid,
NameData  slotname,
XLogRecPtr  restart_lsn,
XLogRecPtr  oldestLSN,
TransactionId  snapshotConflictHorizon,
long  slot_idle_seconds 
)
static

Definition at line 1651 of file slot.c.

1659{
1660 StringInfoData err_detail;
1661 StringInfoData err_hint;
1662
1663 initStringInfo(&err_detail);
1664 initStringInfo(&err_hint);
1665
1666 switch (cause)
1667 {
1669 {
1670 uint64 ex = oldestLSN - restart_lsn;
1671
1672 appendStringInfo(&err_detail,
1673 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1674 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1675 ex),
1676 LSN_FORMAT_ARGS(restart_lsn),
1677 ex);
1678 /* translator: %s is a GUC variable name */
1679 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1680 "max_slot_wal_keep_size");
1681 break;
1682 }
1683 case RS_INVAL_HORIZON:
1684 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1685 snapshotConflictHorizon);
1686 break;
1687
1688 case RS_INVAL_WAL_LEVEL:
1689 appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1690 break;
1691
1693 {
1694 /* translator: %s is a GUC variable name */
1695 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1696 slot_idle_seconds, "idle_replication_slot_timeout",
1698 /* translator: %s is a GUC variable name */
1699 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1700 "idle_replication_slot_timeout");
1701 break;
1702 }
1703 case RS_INVAL_NONE:
1705 }
1706
1707 ereport(LOG,
1708 terminating ?
1709 errmsg("terminating process %d to release replication slot \"%s\"",
1710 pid, NameStr(slotname)) :
1711 errmsg("invalidating obsolete replication slot \"%s\"",
1712 NameStr(slotname)),
1713 errdetail_internal("%s", err_detail.data),
1714 err_hint.len ? errhint("%s", err_hint.data) : 0);
1715
1716 pfree(err_detail.data);
1717 pfree(err_hint.data);
1718}
#define ngettext(s, p, n)
Definition: c.h:1179
uint64_t uint64
Definition: c.h:553
#define pg_unreachable()
Definition: c.h:347
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References _, appendStringInfo(), appendStringInfoString(), StringInfoData::data, ereport, errdetail_internal(), errhint(), errmsg(), idle_replication_slot_timeout_secs, initStringInfo(), StringInfoData::len, LOG, LSN_FORMAT_ARGS, NameStr, ngettext, pfree(), pg_unreachable, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, and RS_INVAL_WAL_REMOVED.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ RestoreSlotFromDisk()

static void RestoreSlotFromDisk ( const char *  name)
static

Definition at line 2491 of file slot.c.

2492{
2494 int i;
2495 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2496 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2497 int fd;
2498 bool restored = false;
2499 int readBytes;
2500 pg_crc32c checksum;
2501 TimestampTz now = 0;
2502
2503 /* no need to lock here, no concurrent access allowed yet */
2504
2505 /* delete temp file if it exists */
2506 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2507 sprintf(path, "%s/state.tmp", slotdir);
2508 if (unlink(path) < 0 && errno != ENOENT)
2509 ereport(PANIC,
2511 errmsg("could not remove file \"%s\": %m", path)));
2512
2513 sprintf(path, "%s/state", slotdir);
2514
2515 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2516
2517 /* on some operating systems fsyncing a file requires O_RDWR */
2518 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2519
2520 /*
2521 * We do not need to handle this as we are rename()ing the directory into
2522 * place only after we fsync()ed the state file.
2523 */
2524 if (fd < 0)
2525 ereport(PANIC,
2527 errmsg("could not open file \"%s\": %m", path)));
2528
2529 /*
2530 * Sync state file before we're reading from it. We might have crashed
2531 * while it wasn't synced yet and we shouldn't continue on that basis.
2532 */
2533 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2534 if (pg_fsync(fd) != 0)
2535 ereport(PANIC,
2537 errmsg("could not fsync file \"%s\": %m",
2538 path)));
2540
2541 /* Also sync the parent directory */
2543 fsync_fname(slotdir, true);
2545
2546 /* read part of statefile that's guaranteed to be version independent */
2547 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2548 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2550 if (readBytes != ReplicationSlotOnDiskConstantSize)
2551 {
2552 if (readBytes < 0)
2553 ereport(PANIC,
2555 errmsg("could not read file \"%s\": %m", path)));
2556 else
2557 ereport(PANIC,
2559 errmsg("could not read file \"%s\": read %d of %zu",
2560 path, readBytes,
2562 }
2563
2564 /* verify magic */
2565 if (cp.magic != SLOT_MAGIC)
2566 ereport(PANIC,
2568 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2569 path, cp.magic, SLOT_MAGIC)));
2570
2571 /* verify version */
2572 if (cp.version != SLOT_VERSION)
2573 ereport(PANIC,
2575 errmsg("replication slot file \"%s\" has unsupported version %u",
2576 path, cp.version)));
2577
2578 /* boundary check on length */
2580 ereport(PANIC,
2582 errmsg("replication slot file \"%s\" has corrupted length %u",
2583 path, cp.length)));
2584
2585 /* Now that we know the size, read the entire file */
2586 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2587 readBytes = read(fd,
2588 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2589 cp.length);
2591 if (readBytes != cp.length)
2592 {
2593 if (readBytes < 0)
2594 ereport(PANIC,
2596 errmsg("could not read file \"%s\": %m", path)));
2597 else
2598 ereport(PANIC,
2600 errmsg("could not read file \"%s\": read %d of %zu",
2601 path, readBytes, (Size) cp.length)));
2602 }
2603
2604 if (CloseTransientFile(fd) != 0)
2605 ereport(PANIC,
2607 errmsg("could not close file \"%s\": %m", path)));
2608
2609 /* now verify the CRC */
2610 INIT_CRC32C(checksum);
2611 COMP_CRC32C(checksum,
2614 FIN_CRC32C(checksum);
2615
2616 if (!EQ_CRC32C(checksum, cp.checksum))
2617 ereport(PANIC,
2618 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2619 path, checksum, cp.checksum)));
2620
2621 /*
2622 * If we crashed with an ephemeral slot active, don't restore but delete
2623 * it.
2624 */
2626 {
2627 if (!rmtree(slotdir, true))
2628 {
2630 (errmsg("could not remove directory \"%s\"",
2631 slotdir)));
2632 }
2634 return;
2635 }
2636
2637 /*
2638 * Verify that requirements for the specific slot type are met. That's
2639 * important because if these aren't met we're not guaranteed to retain
2640 * all the necessary resources for the slot.
2641 *
2642 * NB: We have to do so *after* the above checks for ephemeral slots,
2643 * because otherwise a slot that shouldn't exist anymore could prevent
2644 * restarts.
2645 *
2646 * NB: Changing the requirements here also requires adapting
2647 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2648 */
2649 if (cp.slotdata.database != InvalidOid)
2650 {
2652 ereport(FATAL,
2653 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2654 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2655 NameStr(cp.slotdata.name)),
2656 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2657
2658 /*
2659 * In standby mode, the hot standby must be enabled. This check is
2660 * necessary to ensure logical slots are invalidated when they become
2661 * incompatible due to insufficient wal_level. Otherwise, if the
2662 * primary reduces wal_level < logical while hot standby is disabled,
2663 * logical slots would remain valid even after promotion.
2664 */
2666 ereport(FATAL,
2667 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2668 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2669 NameStr(cp.slotdata.name)),
2670 errhint("Change \"hot_standby\" to be \"on\".")));
2671 }
2672 else if (wal_level < WAL_LEVEL_REPLICA)
2673 ereport(FATAL,
2674 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2675 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2676 NameStr(cp.slotdata.name)),
2677 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2678
2679 /* nothing can be active yet, don't lock anything */
2680 for (i = 0; i < max_replication_slots; i++)
2681 {
2682 ReplicationSlot *slot;
2683
2685
2686 if (slot->in_use)
2687 continue;
2688
2689 /* restore the entire set of persistent data */
2690 memcpy(&slot->data, &cp.slotdata,
2692
2693 /* initialize in memory state */
2694 slot->effective_xmin = cp.slotdata.xmin;
2698
2703
2704 slot->in_use = true;
2705 slot->active_pid = 0;
2706
2707 /*
2708 * Set the time since the slot has become inactive after loading the
2709 * slot from the disk into memory. Whoever acquires the slot i.e.
2710 * makes the slot active will reset it. Use the same inactive_since
2711 * time for all the slots.
2712 */
2713 if (now == 0)
2715
2717
2718 restored = true;
2719 break;
2720 }
2721
2722 if (!restored)
2723 ereport(FATAL,
2724 (errmsg("too many replication slots active before shutdown"),
2725 errhint("Increase \"max_replication_slots\" and try again.")));
2726}
#define PG_BINARY
Definition: c.h:1271
#define FATAL
Definition: elog.h:41
#define PANIC
Definition: elog.h:42
int CloseTransientFile(int fd)
Definition: fd.c:2851
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2674
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:135
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:132
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:138
#define SLOT_VERSION
Definition: slot.c:142
#define SLOT_MAGIC
Definition: slot.c:141
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:129
uint32 version
Definition: slot.c:75
ReplicationSlotPersistentData slotdata
Definition: slot.c:83
pg_crc32c checksum
Definition: slot.c:72
TransactionId catalog_xmin
Definition: slot.h:122
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
bool EnableHotStandby
Definition: xlog.c:123
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76
bool StandbyMode
Definition: xlogrecovery.c:150

References ReplicationSlot::active_pid, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, EnableHotStandby, END_CRIT_SECTION, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errhint(), errmsg(), FATAL, fd(), FIN_CRC32C, fsync_fname(), GetCurrentTimestamp(), i, ReplicationSlot::in_use, INIT_CRC32C, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, ReplicationSlotOnDisk::length, ReplicationSlotOnDisk::magic, max_replication_slots, MAXPGPATH, name, ReplicationSlotPersistentData::name, NameStr, now(), OpenTransientFile(), PANIC, ReplicationSlotPersistentData::persistency, PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotSetInactiveSince(), ReplicationSlotPersistentData::restart_lsn, rmtree(), RS_PERSISTENT, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, sprintf, StandbyMode, START_CRIT_SECTION, ReplicationSlotOnDisk::version, wal_level, WAL_LEVEL_LOGICAL, WAL_LEVEL_REPLICA, WARNING, and ReplicationSlotPersistentData::xmin.

Referenced by StartupReplicationSlots().

◆ SaveSlotToPath()

static void SaveSlotToPath ( ReplicationSlot slot,
const char *  dir,
int  elevel 
)
static

Definition at line 2328 of file slot.c.

2329{
2330 char tmppath[MAXPGPATH];
2331 char path[MAXPGPATH];
2332 int fd;
2334 bool was_dirty;
2335
2336 /* first check whether there's something to write out */
2337 SpinLockAcquire(&slot->mutex);
2338 was_dirty = slot->dirty;
2339 slot->just_dirtied = false;
2340 SpinLockRelease(&slot->mutex);
2341
2342 /* and don't do anything if there's nothing to write */
2343 if (!was_dirty)
2344 return;
2345
2347
2348 /* silence valgrind :( */
2349 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2350
2351 sprintf(tmppath, "%s/state.tmp", dir);
2352 sprintf(path, "%s/state", dir);
2353
2354 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2355 if (fd < 0)
2356 {
2357 /*
2358 * If not an ERROR, then release the lock before returning. In case
2359 * of an ERROR, the error recovery path automatically releases the
2360 * lock, but no harm in explicitly releasing even in that case. Note
2361 * that LWLockRelease() could affect errno.
2362 */
2363 int save_errno = errno;
2364
2366 errno = save_errno;
2367 ereport(elevel,
2369 errmsg("could not create file \"%s\": %m",
2370 tmppath)));
2371 return;
2372 }
2373
2374 cp.magic = SLOT_MAGIC;
2376 cp.version = SLOT_VERSION;
2378
2379 SpinLockAcquire(&slot->mutex);
2380
2381 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2382
2383 SpinLockRelease(&slot->mutex);
2384
2388 FIN_CRC32C(cp.checksum);
2389
2390 errno = 0;
2391 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2392 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2393 {
2394 int save_errno = errno;
2395
2398 unlink(tmppath);
2400
2401 /* if write didn't set errno, assume problem is no disk space */
2402 errno = save_errno ? save_errno : ENOSPC;
2403 ereport(elevel,
2405 errmsg("could not write to file \"%s\": %m",
2406 tmppath)));
2407 return;
2408 }
2410
2411 /* fsync the temporary file */
2412 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2413 if (pg_fsync(fd) != 0)
2414 {
2415 int save_errno = errno;
2416
2419 unlink(tmppath);
2421
2422 errno = save_errno;
2423 ereport(elevel,
2425 errmsg("could not fsync file \"%s\": %m",
2426 tmppath)));
2427 return;
2428 }
2430
2431 if (CloseTransientFile(fd) != 0)
2432 {
2433 int save_errno = errno;
2434
2435 unlink(tmppath);
2437
2438 errno = save_errno;
2439 ereport(elevel,
2441 errmsg("could not close file \"%s\": %m",
2442 tmppath)));
2443 return;
2444 }
2445
2446 /* rename to permanent file, fsync file and directory */
2447 if (rename(tmppath, path) != 0)
2448 {
2449 int save_errno = errno;
2450
2451 unlink(tmppath);
2453
2454 errno = save_errno;
2455 ereport(elevel,
2457 errmsg("could not rename file \"%s\" to \"%s\": %m",
2458 tmppath, path)));
2459 return;
2460 }
2461
2462 /*
2463 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2464 */
2466
2467 fsync_fname(path, false);
2468 fsync_fname(dir, true);
2470
2472
2473 /*
2474 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2475 * already and remember the confirmed_flush LSN value.
2476 */
2477 SpinLockAcquire(&slot->mutex);
2478 if (!slot->just_dirtied)
2479 slot->dirty = false;
2482 SpinLockRelease(&slot->mutex);
2483
2485}
#define write(a, b, c)
Definition: win32.h:14

References ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, ReplicationSlot::io_in_progress_lock, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, ReplicationSlotOnDisk::length, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlotOnDisk::magic, MAXPGPATH, ReplicationSlot::mutex, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotPersistentData::restart_lsn, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, SpinLockAcquire, SpinLockRelease, sprintf, START_CRIT_SECTION, ReplicationSlotOnDisk::version, and write.

Referenced by CheckPointReplicationSlots(), CreateSlotOnDisk(), and ReplicationSlotSave().

◆ SearchNamedReplicationSlot()

ReplicationSlot * SearchNamedReplicationSlot ( const char *  name,
bool  need_lock 
)

Definition at line 546 of file slot.c.

547{
548 int i;
549 ReplicationSlot *slot = NULL;
550
551 if (need_lock)
552 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
553
554 for (i = 0; i < max_replication_slots; i++)
555 {
557
558 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
559 {
560 slot = s;
561 break;
562 }
563 }
564
565 if (need_lock)
566 LWLockRelease(ReplicationSlotControlLock);
567
568 return slot;
569}

References ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, name, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by acquire_conflict_slot_if_exists(), get_replslot_index(), pg_ls_replslotdir(), pgstat_reset_replslot(), ReadReplicationSlot(), ReplicationSlotAcquire(), StandbySlotsHaveCaughtup(), and synchronize_one_slot().

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char *  slot_name)

Definition at line 2877 of file slot.c.

2878{
2879 const char *standby_slot_name;
2880
2881 /* Return false if there is no value in synchronized_standby_slots */
2883 return false;
2884
2885 /*
2886 * XXX: We are not expecting this list to be long so a linear search
2887 * shouldn't hurt but if that turns out not to be true then we can cache
2888 * this information for each WalSender as well.
2889 */
2890 standby_slot_name = synchronized_standby_slots_config->slot_names;
2891 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2892 {
2893 if (strcmp(standby_slot_name, slot_name) == 0)
2894 return true;
2895
2896 standby_slot_name += strlen(standby_slot_name) + 1;
2897 }
2898
2899 return false;
2900}

References i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 2910 of file slot.c.

2911{
2912 const char *name;
2913 int caught_up_slot_num = 0;
2914 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2915
2916 /*
2917 * Don't need to wait for the standbys to catch up if there is no value in
2918 * synchronized_standby_slots.
2919 */
2921 return true;
2922
2923 /*
2924 * Don't need to wait for the standbys to catch up if we are on a standby
2925 * server, since we do not support syncing slots to cascading standbys.
2926 */
2927 if (RecoveryInProgress())
2928 return true;
2929
2930 /*
2931 * Don't need to wait for the standbys to catch up if they are already
2932 * beyond the specified WAL location.
2933 */
2935 ss_oldest_flush_lsn >= wait_for_lsn)
2936 return true;
2937
2938 /*
2939 * To prevent concurrent slot dropping and creation while filtering the
2940 * slots, take the ReplicationSlotControlLock outside of the loop.
2941 */
2942 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2943
2945 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2946 {
2947 XLogRecPtr restart_lsn;
2948 bool invalidated;
2949 bool inactive;
2950 ReplicationSlot *slot;
2951
2952 slot = SearchNamedReplicationSlot(name, false);
2953
2954 /*
2955 * If a slot name provided in synchronized_standby_slots does not
2956 * exist, report a message and exit the loop.
2957 */
2958 if (!slot)
2959 {
2960 ereport(elevel,
2961 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2962 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2963 name, "synchronized_standby_slots"),
2964 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2965 name),
2966 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2967 name, "synchronized_standby_slots"));
2968 break;
2969 }
2970
2971 /* Same as above: if a slot is not physical, exit the loop. */
2972 if (SlotIsLogical(slot))
2973 {
2974 ereport(elevel,
2975 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2976 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2977 name, "synchronized_standby_slots"),
2978 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2979 name),
2980 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2981 name, "synchronized_standby_slots"));
2982 break;
2983 }
2984
2985 SpinLockAcquire(&slot->mutex);
2986 restart_lsn = slot->data.restart_lsn;
2987 invalidated = slot->data.invalidated != RS_INVAL_NONE;
2988 inactive = slot->active_pid == 0;
2989 SpinLockRelease(&slot->mutex);
2990
2991 if (invalidated)
2992 {
2993 /* Specified physical slot has been invalidated */
2994 ereport(elevel,
2995 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2996 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2997 name, "synchronized_standby_slots"),
2998 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2999 name),
3000 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3001 name, "synchronized_standby_slots"));
3002 break;
3003 }
3004
3005 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3006 {
3007 /* Log a message if no active_pid for this physical slot */
3008 if (inactive)
3009 ereport(elevel,
3010 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3011 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3012 name, "synchronized_standby_slots"),
3013 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3014 name),
3015 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3016 name, "synchronized_standby_slots"));
3017
3018 /* Continue if the current slot hasn't caught up. */
3019 break;
3020 }
3021
3022 Assert(restart_lsn >= wait_for_lsn);
3023
3024 if (!XLogRecPtrIsValid(min_restart_lsn) ||
3025 min_restart_lsn > restart_lsn)
3026 min_restart_lsn = restart_lsn;
3027
3028 caught_up_slot_num++;
3029
3030 name += strlen(name) + 1;
3031 }
3032
3033 LWLockRelease(ReplicationSlotControlLock);
3034
3035 /*
3036 * Return false if not all the standbys have caught up to the specified
3037 * WAL location.
3038 */
3039 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3040 return false;
3041
3042 /* The ss_oldest_flush_lsn must not retreat. */
3044 min_restart_lsn >= ss_oldest_flush_lsn);
3045
3046 ss_oldest_flush_lsn = min_restart_lsn;
3047
3048 return true;
3049}

References ReplicationSlot::active_pid, Assert(), ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg(), i, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire, SpinLockRelease, ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2206 of file slot.c.

2207{
2208 DIR *replication_dir;
2209 struct dirent *replication_de;
2210
2211 elog(DEBUG1, "starting up replication slots");
2212
2213 /* restore all slots by iterating over all on-disk entries */
2214 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2215 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2216 {
2217 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2218 PGFileType de_type;
2219
2220 if (strcmp(replication_de->d_name, ".") == 0 ||
2221 strcmp(replication_de->d_name, "..") == 0)
2222 continue;
2223
2224 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2225 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2226
2227 /* we're only creating directories here, skip if it's not our's */
2228 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2229 continue;
2230
2231 /* we crashed while a slot was being setup or deleted, clean up */
2232 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2233 {
2234 if (!rmtree(path, true))
2235 {
2237 (errmsg("could not remove directory \"%s\"",
2238 path)));
2239 continue;
2240 }
2242 continue;
2243 }
2244
2245 /* looks like a slot in a normal state, restore */
2246 RestoreSlotFromDisk(replication_de->d_name);
2247 }
2248 FreeDir(replication_dir);
2249
2250 /* currently no slots exist, we're done. */
2251 if (max_replication_slots <= 0)
2252 return;
2253
2254 /* Now that we have recovered all the data, compute replication xmin */
2257}
int FreeDir(DIR *dir)
Definition: fd.c:3005
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2887
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2953
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
#define snprintf
Definition: port.h:260
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2491
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errmsg(), FreeDir(), fsync_fname(), get_dirent_type(), max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(SlotInvalidationCauses = =(RS_INVAL_MAX_CAUSES+1),
"array length mismatch"   
)

◆ validate_sync_standby_slots()

static bool validate_sync_standby_slots ( char *  rawname,
List **  elemlist 
)
static

Definition at line 2772 of file slot.c.

2773{
2774 /* Verify syntax and parse string into a list of identifiers */
2775 if (!SplitIdentifierString(rawname, ',', elemlist))
2776 {
2777 GUC_check_errdetail("List syntax is invalid.");
2778 return false;
2779 }
2780
2781 /* Iterate the list to validate each slot name */
2782 foreach_ptr(char, name, *elemlist)
2783 {
2784 int err_code;
2785 char *err_msg = NULL;
2786 char *err_hint = NULL;
2787
2788 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2789 &err_msg, &err_hint))
2790 {
2791 GUC_check_errcode(err_code);
2792 GUC_check_errdetail("%s", err_msg);
2793 if (err_hint != NULL)
2794 GUC_check_errhint("%s", err_hint);
2795 return false;
2796 }
2797 }
2798
2799 return true;
2800}
void GUC_check_errcode(int sqlerrcode)
Definition: guc.c:6628
#define GUC_check_errdetail
Definition: guc.h:505
#define GUC_check_errhint
Definition: guc.h:509
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:2755

References foreach_ptr, GUC_check_errcode(), GUC_check_errdetail, GUC_check_errhint, name, ReplicationSlotValidateNameInternal(), and SplitIdentifierString().

Referenced by check_synchronized_standby_slots().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 3058 of file slot.c.

3059{
3060 /*
3061 * Don't need to wait for the standby to catch up if the current acquired
3062 * slot is not a logical failover slot, or there is no value in
3063 * synchronized_standby_slots.
3064 */
3066 return;
3067
3069
3070 for (;;)
3071 {
3073
3075 {
3076 ConfigReloadPending = false;
3078 }
3079
3080 /* Exit if done waiting for every slot. */
3081 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3082 break;
3083
3084 /*
3085 * Wait for the slots in the synchronized_standby_slots to catch up,
3086 * but use a timeout (1s) so we can also check if the
3087 * synchronized_standby_slots has been changed.
3088 */
3090 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3091 }
3092
3094}
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2910
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:117

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

int idle_replication_slot_timeout_secs = 0

◆ max_replication_slots

◆ MyReplicationSlot

ReplicationSlot* MyReplicationSlot = NULL

Definition at line 148 of file slot.c.

Referenced by ApplyLauncherMain(), binary_upgrade_logical_slot_has_caught_up(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotHasPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ SlotInvalidationCauses

const SlotInvalidationCauseMap SlotInvalidationCauses[]
static
Initial value:
= {
{RS_INVAL_NONE, "none"},
{RS_INVAL_WAL_REMOVED, "wal_removed"},
{RS_INVAL_HORIZON, "rows_removed"},
{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
}

Definition at line 113 of file slot.c.

Referenced by GetSlotInvalidationCause(), and GetSlotInvalidationCauseName().

◆ ss_oldest_flush_lsn

XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
static

Definition at line 173 of file slot.c.

Referenced by assign_synchronized_standby_slots(), and StandbySlotsHaveCaughtup().

◆ synchronized_standby_slots

char* synchronized_standby_slots

Definition at line 164 of file slot.c.

◆ synchronized_standby_slots_config