PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/subsystems.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
Include dependency graph for slot.c:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotOnDisk
 
struct  SyncStandbySlotsConfigData
 
struct  SlotInvalidationCauseMap
 

Macros

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)
 
#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)
 
#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
 
#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
#define SLOT_MAGIC   0x1051CA1 /* format identifier */
 
#define SLOT_VERSION   5 /* version for new files */
 

Typedefs

typedef struct ReplicationSlotOnDisk ReplicationSlotOnDisk
 
typedef struct SlotInvalidationCauseMap SlotInvalidationCauseMap
 

Functions

 StaticAssertDecl (lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
 
static void ReplicationSlotsShmemRequest (void *arg)
 
static void ReplicationSlotsShmemInit (void *arg)
 
static void ReplicationSlotShmemExit (int code, Datum arg)
 
static bool IsSlotForConflictCheck (const char *name)
 
static void ReplicationSlotDropPtr (ReplicationSlot *slot)
 
static void RestoreSlotFromDisk (const char *name)
 
static void CreateSlotOnDisk (ReplicationSlot *slot)
 
static void SaveSlotToPath (ReplicationSlot *slot, const char *dir, int elevel)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotDropAcquired (void)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool CheckLogicalSlotExists (void)
 
void CheckSlotRequirements (bool repack)
 
void CheckSlotPermissions (void)
 
void ReplicationSlotReserveWal (void)
 
static void ReportSlotInvalidation (ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
 
static bool CanInvalidateIdleSlot (ReplicationSlot *s)
 
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
 
static bool InvalidatePossiblyObsoleteSlot (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void StartupReplicationSlots (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const char * GetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
static bool validate_sync_standby_slots (char *rawname, List **elemlist)
 
bool check_synchronized_standby_slots (char **newval, void **extra, GucSource source)
 
void assign_synchronized_standby_slots (const char *newval, void *extra)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

static const SlotInvalidationCauseMap SlotInvalidationCauses []
 
ReplicationSlotCtlDataReplicationSlotCtl = NULL
 
const ShmemCallbacks ReplicationSlotsShmemCallbacks
 
ReplicationSlotMyReplicationSlot = NULL
 
int max_replication_slots = 10
 
int max_repack_replication_slots = 5
 
int idle_replication_slot_timeout_secs = 0
 
char * synchronized_standby_slots
 
static SyncStandbySlotsConfigDatasynchronized_standby_slots_config
 
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ ReplicationSlotOnDiskChecksummedSize

#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize

Definition at line 137 of file slot.c.

152 {
153 .request_fn = ReplicationSlotsShmemRequest,
154 .init_fn = ReplicationSlotsShmemInit,
155};
156
157/* My backend's replication slot in the shared memory array */
159
160/* GUC variables */
161int max_replication_slots = 10; /* the maximum number of replication
162 * slots */
163int max_repack_replication_slots = 5; /* the maximum number of slots
164 * for REPACK */
165
166/*
167 * Invalidate replication slots that have remained idle longer than this
168 * duration; '0' disables it.
169 */
171
172/*
173 * This GUC lists streaming replication standby server slot names that
174 * logical WAL sender processes will wait for.
175 */
177
178/* This is the parsed and cached configuration for synchronized_standby_slots */
180
181/*
182 * Oldest LSN that has been confirmed to be flushed to the standbys
183 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 */
186
187static void ReplicationSlotShmemExit(int code, Datum arg);
188static bool IsSlotForConflictCheck(const char *name);
189static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190
191/* internal persistency functions */
192static void RestoreSlotFromDisk(const char *name);
193static void CreateSlotOnDisk(ReplicationSlot *slot);
194static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195
196/*
197 * Register shared memory space needed for replication slots.
198 */
199static void
201{
202 Size size;
203
205 return;
206
207 size = offsetof(ReplicationSlotCtlData, replication_slots);
208 size = add_size(size,
210 sizeof(ReplicationSlot)));
211 ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 .size = size,
213 .ptr = (void **) &ReplicationSlotCtl,
214 );
215}
216
217/*
218 * Initialize shared memory for replication slots.
219 */
220static void
222{
224 {
226
227 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
231 LWTRANCHE_REPLICATION_SLOT_IO);
233 }
234}
235
236/*
237 * Register the callback for replication slot cleanup and releasing.
238 */
239void
241{
243}
244
245/*
246 * Release and cleanup replication slots.
247 */
248static void
250{
251 /* Make sure active replication slots are released */
252 if (MyReplicationSlot != NULL)
254
255 /* Also cleanup all the temporary slots. */
257}
258
259/*
260 * Check whether the passed slot name is valid and report errors at elevel.
261 *
262 * See comments for ReplicationSlotValidateNameInternal().
263 */
264bool
265ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
266 int elevel)
267{
268 int err_code;
269 char *err_msg = NULL;
270 char *err_hint = NULL;
271
272 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
273 &err_code, &err_msg, &err_hint))
274 {
275 /*
276 * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 * and errhint(), since the messages from
278 * ReplicationSlotValidateNameInternal() are already translated. This
279 * avoids double translation.
280 */
281 ereport(elevel,
282 errcode(err_code),
283 errmsg_internal("%s", err_msg),
284 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285
286 pfree(err_msg);
287 if (err_hint != NULL)
288 pfree(err_hint);
289 return false;
290 }
291
292 return true;
293}
294
295/*
296 * Check whether the passed slot name is valid.
297 *
298 * An error will be reported for a reserved replication slot name if
299 * allow_reserved_name is set to false.
300 *
301 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 * the name to be used as a directory name on every supported OS.
303 *
304 * Returns true if the slot name is valid. Otherwise, returns false and stores
305 * the error code, error message, and optional hint in err_code, err_msg, and
306 * err_hint, respectively. The caller is responsible for freeing err_msg and
307 * err_hint, which are palloc'd.
308 */
309bool
310ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
311 int *err_code, char **err_msg, char **err_hint)
312{
313 const char *cp;
314
315 if (strlen(name) == 0)
316 {
317 *err_code = ERRCODE_INVALID_NAME;
318 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 *err_hint = NULL;
320 return false;
321 }
322
323 if (strlen(name) >= NAMEDATALEN)
324 {
325 *err_code = ERRCODE_NAME_TOO_LONG;
326 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 *err_hint = NULL;
328 return false;
329 }
330
331 for (cp = name; *cp; cp++)
332 {
333 if (!((*cp >= 'a' && *cp <= 'z')
334 || (*cp >= '0' && *cp <= '9')
335 || (*cp == '_')))
336 {
337 *err_code = ERRCODE_INVALID_NAME;
338 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 return false;
341 }
342 }
343
344 if (!allow_reserved_name && IsSlotForConflictCheck(name))
345 {
346 *err_code = ERRCODE_RESERVED_NAME;
347 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 return false;
351 }
352
353 return true;
354}
355
356/*
357 * Return true if the replication slot name is "pg_conflict_detection".
358 */
359static bool
360IsSlotForConflictCheck(const char *name)
361{
362 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363}
364
365/*
366 * Create a new replication slot and mark it as used by this backend.
367 *
368 * name: Name of the slot
369 * db_specific: logical decoding is db specific; if the slot is going to
370 * be used for that pass true, otherwise false.
371 * two_phase: If enabled, allows decoding of prepared transactions.
372 * repack: If true, use a slot from the pool for REPACK.
373 * failover: If enabled, allows the slot to be synced to standbys so
374 * that logical replication can be resumed after failover.
375 * synced: True if the slot is synchronized from the primary server.
376 */
377void
378ReplicationSlotCreate(const char *name, bool db_specific,
379 ReplicationSlotPersistency persistency,
380 bool two_phase, bool repack, bool failover, bool synced)
381{
382 ReplicationSlot *slot = NULL;
383 int startpoint,
384 endpoint;
385
386 Assert(MyReplicationSlot == NULL);
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
407 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
419 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
430 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431
432 /*
433 * Check for name collision (across the whole array), and identify an
434 * allocatable slot (in the array slice specific to our current use case:
435 * either general, or REPACK only). We need to hold
436 * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 * can change the in_use flags while we're looking at them.
438 */
439 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
440 startpoint = !repack ? 0 : max_replication_slots;
441 endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
443 {
445
446 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
449 errmsg("replication slot \"%s\" already exists", name)));
450
451 if (i >= startpoint && i < endpoint &&
452 !s->in_use && slot == NULL)
453 slot = s;
454 }
455 LWLockRelease(ReplicationSlotControlLock);
456
457 /* If all slots are in use, we're out of luck. */
458 if (slot == NULL)
460 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
461 errmsg("all replication slots are in use"),
462 errhint("Free one or increase \"%s\".",
463 repack ? "max_repack_replication_slots" : "max_replication_slots")));
464
465 /*
466 * Since this slot is not in use, nobody should be looking at any part of
467 * it other than the in_use field unless they're trying to allocate it.
468 * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 * be doing that. So it's safe to initialize the slot.
470 */
471 Assert(!slot->in_use);
473
474 /* first initialize persistent data */
475 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 namestrcpy(&slot->data.name, name);
477 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
478 slot->data.persistency = persistency;
479 slot->data.two_phase = two_phase;
481 slot->data.failover = failover;
482 slot->data.synced = synced;
483
484 /* and then data only present in shared memory */
485 slot->just_dirtied = false;
486 slot->dirty = false;
495 slot->inactive_since = 0;
497
498 /*
499 * Create the slot on disk. We haven't actually marked the slot allocated
500 * yet, so no special cleanup is required if this errors out.
501 */
502 CreateSlotOnDisk(slot);
503
504 /*
505 * We need to briefly prevent any other backend from iterating over the
506 * slots while we flip the in_use flag. We also need to set the active
507 * flag while holding the ControlLock as otherwise a concurrent
508 * ReplicationSlotAcquire() could acquire the slot as well.
509 */
510 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
511
512 slot->in_use = true;
513
514 /* We can now mark the slot active, and that makes it our slot. */
515 SpinLockAcquire(&slot->mutex);
518 SpinLockRelease(&slot->mutex);
519 MyReplicationSlot = slot;
520
521 LWLockRelease(ReplicationSlotControlLock);
522
523 /*
524 * Create statistics entry for the new logical slot. We don't collect any
525 * stats for physical slots, so no need to create an entry for the same.
526 * See ReplicationSlotDropPtr for why we need to do this before releasing
527 * ReplicationSlotAllocationLock.
528 */
529 if (SlotIsLogical(slot))
531
532 /*
533 * Now that the slot has been marked as in_use and active, it's safe to
534 * let somebody else try to allocate a slot.
535 */
536 LWLockRelease(ReplicationSlotAllocationLock);
537
538 /* Let everybody know we've modified this slot */
540}
541
542/*
543 * Search for the named replication slot.
544 *
545 * Return the replication slot if found, otherwise NULL.
546 */
548SearchNamedReplicationSlot(const char *name, bool need_lock)
549{
550 int i;
551 ReplicationSlot *slot = NULL;
552
553 if (need_lock)
554 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
555
557 {
559
560 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 {
562 slot = s;
563 break;
564 }
565 }
566
567 if (need_lock)
568 LWLockRelease(ReplicationSlotControlLock);
569
570 return slot;
571}
572
573/*
574 * Return the index of the replication slot in
575 * ReplicationSlotCtl->replication_slots.
576 *
577 * This is mainly useful to have an efficient key for storing replication slot
578 * stats.
579 */
580int
582{
584 slot < ReplicationSlotCtl->replication_slots +
586
588}
589
590/*
591 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 * the slot's name and true is returned.
593 *
594 * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 * cases there are obvious TOCTOU issues.
596 */
597bool
599{
600 ReplicationSlot *slot;
601 bool found;
602
604
605 /*
606 * Ensure that the slot cannot be dropped while we copy the name. Don't
607 * need the spinlock as the name of an existing slot cannot change.
608 */
609 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
610 found = slot->in_use;
611 if (slot->in_use)
613 LWLockRelease(ReplicationSlotControlLock);
614
615 return found;
616}
617
618/*
619 * Find a previously created slot and mark it as used by this process.
620 *
621 * An error is raised if nowait is true and the slot is currently in use. If
622 * nowait is false, we sleep until the slot is released by the owning process.
623 *
624 * An error is raised if error_if_invalid is true and the slot is found to
625 * be invalid. It should always be set to true, except when we are temporarily
626 * acquiring the slot and don't intend to change it.
627 */
628void
629ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630{
632 ProcNumber active_proc;
633 int active_pid;
634
635 Assert(name != NULL);
636
637retry:
638 Assert(MyReplicationSlot == NULL);
639
640 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
641
642 /* Check if the slot exists with the given name. */
644 if (s == NULL || !s->in_use)
645 {
646 LWLockRelease(ReplicationSlotControlLock);
647
649 (errcode(ERRCODE_UNDEFINED_OBJECT),
650 errmsg("replication slot \"%s\" does not exist",
651 name)));
652 }
653
654 /*
655 * Do not allow users to acquire the reserved slot. This scenario may
656 * occur if the launcher that owns the slot has terminated unexpectedly
657 * due to an error, and a backend process attempts to reuse the slot.
658 */
661 errcode(ERRCODE_UNDEFINED_OBJECT),
662 errmsg("cannot acquire replication slot \"%s\"", name),
663 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664
665 /*
666 * This is the slot we want; check if it's active under some other
667 * process. In single user mode, we don't need this check.
668 */
670 {
671 /*
672 * Get ready to sleep on the slot in case it is active. (We may end
673 * up not sleeping, but we don't want to do this while holding the
674 * spinlock.)
675 */
676 if (!nowait)
678
679 /*
680 * It is important to reset the inactive_since under spinlock here to
681 * avoid race conditions with slot invalidation. See comments related
682 * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 */
687 active_proc = s->active_proc;
690 }
691 else
692 {
693 s->active_proc = active_proc = MyProcNumber;
695 }
696 active_pid = GetPGProcByNumber(active_proc)->pid;
697 LWLockRelease(ReplicationSlotControlLock);
698
699 /*
700 * If we found the slot but it's already active in another process, we
701 * wait until the owning process signals us that it's been released, or
702 * error out.
703 */
704 if (active_proc != MyProcNumber)
705 {
706 if (!nowait)
707 {
708 /* Wait here until we get signaled, and then restart */
710 WAIT_EVENT_REPLICATION_SLOT_DROP);
712 goto retry;
713 }
714
716 (errcode(ERRCODE_OBJECT_IN_USE),
717 errmsg("replication slot \"%s\" is active for PID %d",
718 NameStr(s->data.name), active_pid)));
719 }
720 else if (!nowait)
721 ConditionVariableCancelSleep(); /* no sleep needed after all */
722
723 /* We made this slot active, so it's ours now. */
725
726 /*
727 * We need to check for invalidation after making the slot ours to avoid
728 * the possible race condition with the checkpointer that can otherwise
729 * invalidate the slot immediately after the check.
730 */
731 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
733 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
734 errmsg("can no longer access replication slot \"%s\"",
735 NameStr(s->data.name)),
736 errdetail("This replication slot has been invalidated due to \"%s\".",
738
739 /* Let everybody know we've modified this slot */
741
742 /*
743 * The call to pgstat_acquire_replslot() protects against stats for a
744 * different slot, from before a restart or such, being present during
745 * pgstat_report_replslot().
746 */
747 if (SlotIsLogical(s))
749
750
751 if (am_walsender)
752 {
755 ? errmsg("acquired logical replication slot \"%s\"",
756 NameStr(s->data.name))
757 : errmsg("acquired physical replication slot \"%s\"",
758 NameStr(s->data.name)));
759 }
760}
761
762/*
763 * Release the replication slot that this backend considers to own.
764 *
765 * This or another backend can re-acquire the slot later.
766 * Resources this slot requires will be preserved.
767 */
768void
770{
772 char *slotname = NULL; /* keep compiler quiet */
773 bool is_logical;
774 TimestampTz now = 0;
775
776 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777
778 is_logical = SlotIsLogical(slot);
779
780 if (am_walsender)
781 slotname = pstrdup(NameStr(slot->data.name));
782
783 if (slot->data.persistency == RS_EPHEMERAL)
784 {
785 /*
786 * Delete the slot. There is no !PANIC case where this is allowed to
787 * fail, all that may happen is an incomplete cleanup of the on-disk
788 * data.
789 */
791
792 /*
793 * Request to disable logical decoding, even though this slot may not
794 * have been the last logical slot. The checkpointer will verify if
795 * logical decoding should actually be disabled.
796 */
797 if (is_logical)
799 }
800
801 /*
802 * If slot needed to temporarily restrain both data and catalog xmin to
803 * create the catalog snapshot, remove that temporary constraint.
804 * Snapshots can only be exported while the initial snapshot is still
805 * acquired.
806 */
807 if (!TransactionIdIsValid(slot->data.xmin) &&
809 {
810 SpinLockAcquire(&slot->mutex);
812 SpinLockRelease(&slot->mutex);
814 }
815
816 /*
817 * Set the time since the slot has become inactive. We get the current
818 * time beforehand to avoid system call while holding the spinlock.
819 */
821
822 if (slot->data.persistency == RS_PERSISTENT)
823 {
824 /*
825 * Mark persistent slot inactive. We're not freeing it, just
826 * disconnecting, but wake up others that may be waiting for it.
827 */
828 SpinLockAcquire(&slot->mutex);
831 SpinLockRelease(&slot->mutex);
833 }
834 else
836
837 MyReplicationSlot = NULL;
838
839 /* might not have been set when we've been a plain slot */
840 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
841 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
843 LWLockRelease(ProcArrayLock);
844
845 if (am_walsender)
846 {
848 is_logical
849 ? errmsg("released logical replication slot \"%s\"",
850 slotname)
851 : errmsg("released physical replication slot \"%s\"",
852 slotname));
853
854 pfree(slotname);
855 }
856}
857
858/*
859 * Cleanup temporary slots created in current session.
860 *
861 * Cleanup only synced temporary slots if 'synced_only' is true, else
862 * cleanup all temporary slots.
863 *
864 * If it drops the last logical slot in the cluster, requests to disable
865 * logical decoding.
866 */
867void
868ReplicationSlotCleanup(bool synced_only)
869{
870 int i;
871 bool found_valid_logicalslot;
872 bool dropped_logical = false;
873
874 Assert(MyReplicationSlot == NULL);
875
876restart:
877 found_valid_logicalslot = false;
878 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
880 {
882
883 if (!s->in_use)
884 continue;
885
887
888 found_valid_logicalslot |=
890
891 if ((s->active_proc == MyProcNumber &&
892 (!synced_only || s->data.synced)))
893 {
896 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
897
898 if (SlotIsLogical(s))
899 dropped_logical = true;
900
902
904 goto restart;
905 }
906 else
908 }
909
910 LWLockRelease(ReplicationSlotControlLock);
911
912 if (dropped_logical && !found_valid_logicalslot)
914}
915
916/*
917 * Permanently drop replication slot identified by the passed in name.
918 */
919void
920ReplicationSlotDrop(const char *name, bool nowait)
921{
922 bool is_logical;
923
924 Assert(MyReplicationSlot == NULL);
925
926 ReplicationSlotAcquire(name, nowait, false);
927
928 /*
929 * Do not allow users to drop the slots which are currently being synced
930 * from the primary to the standby.
931 */
934 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
935 errmsg("cannot drop replication slot \"%s\"", name),
936 errdetail("This replication slot is being synchronized from the primary server."));
937
938 is_logical = SlotIsLogical(MyReplicationSlot);
939
941
942 if (is_logical)
944}
945
946/*
947 * Change the definition of the slot identified by the specified name.
948 *
949 * Altering the two_phase property of a slot requires caution on the
950 * client-side. Enabling it at any random point during decoding has the
951 * risk that transactions prepared before this change may be skipped by
952 * the decoder, leading to missing prepare records on the client. So, we
953 * enable it for subscription related slots only once the initial tablesync
954 * is finished. See comments atop worker.c. Disabling it is safe only when
955 * there are no pending prepared transaction, otherwise, the changes of
956 * already prepared transactions can be replicated again along with their
957 * corresponding commit leading to duplicate data or errors.
958 */
959void
960ReplicationSlotAlter(const char *name, const bool *failover,
961 const bool *two_phase)
962{
963 bool update_slot = false;
964
965 Assert(MyReplicationSlot == NULL);
967
968 ReplicationSlotAcquire(name, false, true);
969
972 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
973 errmsg("cannot use %s with a physical replication slot",
974 "ALTER_REPLICATION_SLOT"));
975
976 if (RecoveryInProgress())
977 {
978 /*
979 * Do not allow users to alter the slots which are currently being
980 * synced from the primary to the standby.
981 */
984 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
985 errmsg("cannot alter replication slot \"%s\"", name),
986 errdetail("This replication slot is being synchronized from the primary server."));
987
988 /*
989 * Do not allow users to enable failover on the standby as we do not
990 * support sync to the cascading standby.
991 */
992 if (failover && *failover)
994 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
995 errmsg("cannot enable failover for a replication slot"
996 " on the standby"));
997 }
998
999 if (failover)
1000 {
1001 /*
1002 * Do not allow users to enable failover for temporary slots as we do
1003 * not support syncing temporary slots to the standby.
1004 */
1006 ereport(ERROR,
1007 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008 errmsg("cannot enable failover for a temporary replication slot"));
1009
1011 {
1015
1016 update_slot = true;
1017 }
1018 }
1019
1021 {
1025
1026 update_slot = true;
1027 }
1028
1029 if (update_slot)
1030 {
1033 }
1034
1036}
1037
1038/*
1039 * Permanently drop the currently acquired replication slot.
1040 */
1041void
1043{
1045
1046 Assert(MyReplicationSlot != NULL);
1047
1048 /* slot isn't acquired anymore */
1049 MyReplicationSlot = NULL;
1050
1052}
1053
1054/*
1055 * Permanently drop the replication slot which will be released by the point
1056 * this function returns.
1057 */
1058static void
1060{
1061 char path[MAXPGPATH];
1062 char tmppath[MAXPGPATH];
1063
1064 /*
1065 * If some other backend ran this code concurrently with us, we might try
1066 * to delete a slot with a certain name while someone else was trying to
1067 * create a slot with the same name.
1068 */
1069 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1070
1071 /* Generate pathnames. */
1072 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1073 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1074
1075 /*
1076 * Rename the slot directory on disk, so that we'll no longer recognize
1077 * this as a valid slot. Note that if this fails, we've got to mark the
1078 * slot inactive before bailing out. If we're dropping an ephemeral or a
1079 * temporary slot, we better never fail hard as the caller won't expect
1080 * the slot to survive and this might get called during error handling.
1081 */
1082 if (rename(path, tmppath) == 0)
1083 {
1084 /*
1085 * We need to fsync() the directory we just renamed and its parent to
1086 * make sure that our changes are on disk in a crash-safe fashion. If
1087 * fsync() fails, we can't be sure whether the changes are on disk or
1088 * not. For now, we handle that by panicking;
1089 * StartupReplicationSlots() will try to straighten it out after
1090 * restart.
1091 */
1093 fsync_fname(tmppath, true);
1096 }
1097 else
1098 {
1099 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1100
1101 SpinLockAcquire(&slot->mutex);
1103 SpinLockRelease(&slot->mutex);
1104
1105 /* wake up anyone waiting on this slot */
1107
1108 ereport(fail_softly ? WARNING : ERROR,
1110 errmsg("could not rename file \"%s\" to \"%s\": %m",
1111 path, tmppath)));
1112 }
1113
1114 /*
1115 * The slot is definitely gone. Lock out concurrent scans of the array
1116 * long enough to kill it. It's OK to clear the active PID here without
1117 * grabbing the mutex because nobody else can be scanning the array here,
1118 * and nobody can be attached to this slot and thus access it without
1119 * scanning the array.
1120 *
1121 * Also wake up processes waiting for it.
1122 */
1123 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1125 slot->in_use = false;
1126 LWLockRelease(ReplicationSlotControlLock);
1128
1129 /*
1130 * Slot is dead and doesn't prevent resource removal anymore, recompute
1131 * limits.
1132 */
1135
1136 /*
1137 * If removing the directory fails, the worst thing that will happen is
1138 * that the user won't be able to create a new slot with the same name
1139 * until the next server restart. We warn about it, but that's all.
1140 */
1141 if (!rmtree(tmppath, true))
1143 (errmsg("could not remove directory \"%s\"", tmppath)));
1144
1145 /*
1146 * Drop the statistics entry for the replication slot. Do this while
1147 * holding ReplicationSlotAllocationLock so that we don't drop a
1148 * statistics entry for another slot with the same name just created in
1149 * another session.
1150 */
1151 if (SlotIsLogical(slot))
1153
1154 /*
1155 * We release this at the very end, so that nobody starts trying to create
1156 * a slot while we're still cleaning up the detritus of the old one.
1157 */
1158 LWLockRelease(ReplicationSlotAllocationLock);
1159}
1160
1161/*
1162 * Serialize the currently acquired slot's state from memory to disk, thereby
1163 * guaranteeing the current state will survive a crash.
1164 */
1165void
1167{
1168 char path[MAXPGPATH];
1169
1170 Assert(MyReplicationSlot != NULL);
1171
1174}
1175
1176/*
1177 * Signal that it would be useful if the currently acquired slot would be
1178 * flushed out to disk.
1179 *
1180 * Note that the actual flush to disk can be delayed for a long time, if
1181 * required for correctness explicitly do a ReplicationSlotSave().
1182 */
1183void
1185{
1187
1188 Assert(MyReplicationSlot != NULL);
1189
1190 SpinLockAcquire(&slot->mutex);
1192 MyReplicationSlot->dirty = true;
1193 SpinLockRelease(&slot->mutex);
1194}
1195
1196/*
1197 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1198 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1199 */
1200void
1202{
1204
1205 Assert(slot != NULL);
1207
1208 SpinLockAcquire(&slot->mutex);
1210 SpinLockRelease(&slot->mutex);
1211
1214}
1215
1216/*
1217 * Compute the oldest xmin across all slots and store it in the ProcArray.
1218 *
1219 * If already_locked is true, both the ReplicationSlotControlLock and the
1220 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1221 * caller first acquires the ReplicationSlotControlLock, followed by the
1222 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1223 * acquires them in that order.
1224 */
1225void
1226ReplicationSlotsComputeRequiredXmin(bool already_locked)
1227{
1228 int i;
1230 TransactionId agg_catalog_xmin = InvalidTransactionId;
1231
1232 Assert(ReplicationSlotCtl != NULL);
1233 Assert(!already_locked ||
1234 (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1235 LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1236
1237 /*
1238 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1239 * values, so no backend updates the initial xmin for newly created slot
1240 * concurrently. A shared lock is used here to minimize lock contention,
1241 * especially when many slots exist and advancements occur frequently.
1242 * This is safe since an exclusive lock is taken during initial slot xmin
1243 * update in slot creation.
1244 *
1245 * One might think that we can hold the ProcArrayLock exclusively and
1246 * update the slot xmin values, but it could increase lock contention on
1247 * the ProcArrayLock, which is not great since this function can be called
1248 * at non-negligible frequency.
1249 *
1250 * Concurrent invocation of this function may cause the computed slot xmin
1251 * to regress. However, this is harmless because tuples prior to the most
1252 * recent xmin are no longer useful once advancement occurs (see
1253 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1254 * before updating the effective_xmin). Thus, such regression merely
1255 * prevents VACUUM from prematurely removing tuples without causing the
1256 * early deletion of required data.
1257 */
1258 if (!already_locked)
1259 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1260
1262 {
1264 TransactionId effective_xmin;
1265 TransactionId effective_catalog_xmin;
1266 bool invalidated;
1267
1268 if (!s->in_use)
1269 continue;
1270
1272 effective_xmin = s->effective_xmin;
1273 effective_catalog_xmin = s->effective_catalog_xmin;
1274 invalidated = s->data.invalidated != RS_INVAL_NONE;
1276
1277 /* invalidated slots need not apply */
1278 if (invalidated)
1279 continue;
1280
1281 /* check the data xmin */
1282 if (TransactionIdIsValid(effective_xmin) &&
1283 (!TransactionIdIsValid(agg_xmin) ||
1284 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1285 agg_xmin = effective_xmin;
1286
1287 /* check the catalog xmin */
1288 if (TransactionIdIsValid(effective_catalog_xmin) &&
1289 (!TransactionIdIsValid(agg_catalog_xmin) ||
1290 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1291 agg_catalog_xmin = effective_catalog_xmin;
1292 }
1293
1294 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1295
1296 if (!already_locked)
1297 LWLockRelease(ReplicationSlotControlLock);
1298}
1299
1300/*
1301 * Compute the oldest restart LSN across all slots and inform xlog module.
1302 *
1303 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1304 * purpose, we don't try to account for that, because this module doesn't
1305 * know what to compare against.
1306 */
1307void
1309{
1310 int i;
1311 XLogRecPtr min_required = InvalidXLogRecPtr;
1312
1313 Assert(ReplicationSlotCtl != NULL);
1314
1315 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1317 {
1319 XLogRecPtr restart_lsn;
1320 XLogRecPtr last_saved_restart_lsn;
1321 bool invalidated;
1322 ReplicationSlotPersistency persistency;
1323
1324 if (!s->in_use)
1325 continue;
1326
1328 persistency = s->data.persistency;
1329 restart_lsn = s->data.restart_lsn;
1330 invalidated = s->data.invalidated != RS_INVAL_NONE;
1331 last_saved_restart_lsn = s->last_saved_restart_lsn;
1333
1334 /* invalidated slots need not apply */
1335 if (invalidated)
1336 continue;
1337
1338 /*
1339 * For persistent slot use last_saved_restart_lsn to compute the
1340 * oldest LSN for removal of WAL segments. The segments between
1341 * last_saved_restart_lsn and restart_lsn might be needed by a
1342 * persistent slot in the case of database crash. Non-persistent
1343 * slots can't survive the database crash, so we don't care about
1344 * last_saved_restart_lsn for them.
1345 */
1346 if (persistency == RS_PERSISTENT)
1347 {
1348 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1349 restart_lsn > last_saved_restart_lsn)
1350 {
1351 restart_lsn = last_saved_restart_lsn;
1352 }
1353 }
1354
1355 if (XLogRecPtrIsValid(restart_lsn) &&
1356 (!XLogRecPtrIsValid(min_required) ||
1357 restart_lsn < min_required))
1358 min_required = restart_lsn;
1359 }
1360 LWLockRelease(ReplicationSlotControlLock);
1361
1363}
1364
1365/*
1366 * Compute the oldest WAL LSN required by *logical* decoding slots..
1367 *
1368 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1369 * slots exist.
1370 *
1371 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1372 * ignores physical replication slots.
1373 *
1374 * The results aren't required frequently, so we don't maintain a precomputed
1375 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1376 */
1379{
1381 int i;
1382
1384 return InvalidXLogRecPtr;
1385
1386 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1387
1389 {
1390 ReplicationSlot *s;
1391 XLogRecPtr restart_lsn;
1392 XLogRecPtr last_saved_restart_lsn;
1393 bool invalidated;
1394 ReplicationSlotPersistency persistency;
1395
1397
1398 /* cannot change while ReplicationSlotCtlLock is held */
1399 if (!s->in_use)
1400 continue;
1401
1402 /* we're only interested in logical slots */
1403 if (!SlotIsLogical(s))
1404 continue;
1405
1406 /* read once, it's ok if it increases while we're checking */
1408 persistency = s->data.persistency;
1409 restart_lsn = s->data.restart_lsn;
1410 invalidated = s->data.invalidated != RS_INVAL_NONE;
1411 last_saved_restart_lsn = s->last_saved_restart_lsn;
1413
1414 /* invalidated slots need not apply */
1415 if (invalidated)
1416 continue;
1417
1418 /*
1419 * For persistent slot use last_saved_restart_lsn to compute the
1420 * oldest LSN for removal of WAL segments. The segments between
1421 * last_saved_restart_lsn and restart_lsn might be needed by a
1422 * persistent slot in the case of database crash. Non-persistent
1423 * slots can't survive the database crash, so we don't care about
1424 * last_saved_restart_lsn for them.
1425 */
1426 if (persistency == RS_PERSISTENT)
1427 {
1428 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1429 restart_lsn > last_saved_restart_lsn)
1430 {
1431 restart_lsn = last_saved_restart_lsn;
1432 }
1433 }
1434
1435 if (!XLogRecPtrIsValid(restart_lsn))
1436 continue;
1437
1438 if (!XLogRecPtrIsValid(result) ||
1439 restart_lsn < result)
1440 result = restart_lsn;
1441 }
1442
1443 LWLockRelease(ReplicationSlotControlLock);
1444
1445 return result;
1446}
1447
1448/*
1449 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1450 * passed database oid.
1451 *
1452 * Returns true if there are any slots referencing the database. *nslots will
1453 * be set to the absolute number of slots in the database, *nactive to ones
1454 * currently active.
1455 */
1456bool
1457ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1458{
1459 int i;
1460
1461 *nslots = *nactive = 0;
1462
1464 return false;
1465
1466 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1468 {
1469 ReplicationSlot *s;
1470
1472
1473 /* cannot change while ReplicationSlotCtlLock is held */
1474 if (!s->in_use)
1475 continue;
1476
1477 /* only logical slots are database specific, skip */
1478 if (!SlotIsLogical(s))
1479 continue;
1480
1481 /* not our database, skip */
1482 if (s->data.database != dboid)
1483 continue;
1484
1485 /* NB: intentionally counting invalidated slots */
1486
1487 /* count slots with spinlock held */
1489 (*nslots)++;
1491 (*nactive)++;
1493 }
1494 LWLockRelease(ReplicationSlotControlLock);
1495
1496 if (*nslots > 0)
1497 return true;
1498 return false;
1499}
1500
1501/*
1502 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1503 * passed database oid. The caller should hold an exclusive lock on the
1504 * pg_database oid for the database to prevent creation of new slots on the db
1505 * or replay from existing slots.
1506 *
1507 * Another session that concurrently acquires an existing slot on the target DB
1508 * (most likely to drop it) may cause this function to ERROR. If that happens
1509 * it may have dropped some but not all slots.
1510 *
1511 * This routine isn't as efficient as it could be - but we don't drop
1512 * databases often, especially databases with lots of slots.
1513 *
1514 * If it drops the last logical slot in the cluster, it requests to disable
1515 * logical decoding.
1516 */
1517void
1519{
1520 int i;
1521 bool found_valid_logicalslot;
1522 bool dropped = false;
1523
1525 return;
1526
1527restart:
1528 found_valid_logicalslot = false;
1529 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1531 {
1532 ReplicationSlot *s;
1533 char *slotname;
1534 ProcNumber active_proc;
1535
1537
1538 /* cannot change while ReplicationSlotCtlLock is held */
1539 if (!s->in_use)
1540 continue;
1541
1542 /* only logical slots are database specific, skip */
1543 if (!SlotIsLogical(s))
1544 continue;
1545
1546 /*
1547 * Check logical slots on other databases too so we can disable
1548 * logical decoding only if no slots in the cluster.
1549 */
1551 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1553
1554 /* not our database, skip */
1555 if (s->data.database != dboid)
1556 continue;
1557
1558 /* NB: intentionally including invalidated slots to drop */
1559
1560 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1562 /* can't change while ReplicationSlotControlLock is held */
1563 slotname = NameStr(s->data.name);
1564 active_proc = s->active_proc;
1565 if (active_proc == INVALID_PROC_NUMBER)
1566 {
1569 }
1571
1572 /*
1573 * Even though we hold an exclusive lock on the database object a
1574 * logical slot for that DB can still be active, e.g. if it's
1575 * concurrently being dropped by a backend connected to another DB.
1576 *
1577 * That's fairly unlikely in practice, so we'll just bail out.
1578 *
1579 * The slot sync worker holds a shared lock on the database before
1580 * operating on synced logical slots to avoid conflict with the drop
1581 * happening here. The persistent synced slots are thus safe but there
1582 * is a possibility that the slot sync worker has created a temporary
1583 * slot (which stays active even on release) and we are trying to drop
1584 * that here. In practice, the chances of hitting this scenario are
1585 * less as during slot synchronization, the temporary slot is
1586 * immediately converted to persistent and thus is safe due to the
1587 * shared lock taken on the database. So, we'll just bail out in such
1588 * a case.
1589 *
1590 * XXX: We can consider shutting down the slot sync worker before
1591 * trying to drop synced temporary slots here.
1592 */
1593 if (active_proc != INVALID_PROC_NUMBER)
1594 ereport(ERROR,
1595 (errcode(ERRCODE_OBJECT_IN_USE),
1596 errmsg("replication slot \"%s\" is active for PID %d",
1597 slotname, GetPGProcByNumber(active_proc)->pid)));
1598
1599 /*
1600 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1601 * holding ReplicationSlotControlLock over filesystem operations,
1602 * release ReplicationSlotControlLock and use
1603 * ReplicationSlotDropAcquired.
1604 *
1605 * As that means the set of slots could change, restart scan from the
1606 * beginning each time we release the lock.
1607 */
1608 LWLockRelease(ReplicationSlotControlLock);
1610 dropped = true;
1611 goto restart;
1612 }
1613 LWLockRelease(ReplicationSlotControlLock);
1614
1615 if (dropped && !found_valid_logicalslot)
1617}
1618
1619/*
1620 * Returns true if there is at least one in-use valid logical replication slot.
1621 */
1622bool
1624{
1625 bool found = false;
1626
1628 return false;
1629
1630 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1632 {
1633 ReplicationSlot *s;
1634 bool invalidated;
1635
1637
1638 /* cannot change while ReplicationSlotCtlLock is held */
1639 if (!s->in_use)
1640 continue;
1641
1642 if (SlotIsPhysical(s))
1643 continue;
1644
1646 invalidated = s->data.invalidated != RS_INVAL_NONE;
1648
1649 if (invalidated)
1650 continue;
1651
1652 found = true;
1653 break;
1654 }
1655 LWLockRelease(ReplicationSlotControlLock);
1656
1657 return found;
1658}
1659
1660/*
1661 * Check whether the server's configuration supports using replication
1662 * slots.
1663 */
1664void
1665CheckSlotRequirements(bool repack)
1666{
1667 /*
1668 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1669 * needs the same check.
1670 */
1671
1672 if (!repack && max_replication_slots == 0)
1673 ereport(ERROR,
1674 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1675 errmsg("replication slots can only be used if \"%s\" > 0",
1676 "max_replication_slots"));
1677
1678 if (repack && max_repack_replication_slots == 0)
1679 ereport(ERROR,
1680 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 errmsg("REPACK can only be used if \"%s\" > 0",
1682 "max_repack_replication_slots"));
1683
1685 ereport(ERROR,
1686 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1687 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1688}
1689
1690/*
1691 * Check whether the user has privilege to use replication slots.
1692 */
1693void
1695{
1697 ereport(ERROR,
1698 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1699 errmsg("permission denied to use replication slots"),
1700 errdetail("Only roles with the %s attribute may use replication slots.",
1701 "REPLICATION")));
1702}
1703
1704/*
1705 * Reserve WAL for the currently active slot.
1706 *
1707 * Compute and set restart_lsn in a manner that's appropriate for the type of
1708 * the slot and concurrency safe.
1709 */
1710void
1712{
1714 XLogSegNo segno;
1715 XLogRecPtr restart_lsn;
1716
1717 Assert(slot != NULL);
1720
1721 /*
1722 * The replication slot mechanism is used to prevent the removal of
1723 * required WAL.
1724 *
1725 * Acquire an exclusive lock to prevent the checkpoint process from
1726 * concurrently computing the minimum slot LSN (see
1727 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1728 * replication cannot be removed during a checkpoint.
1729 *
1730 * The mechanism is reliable because if WAL reservation occurs first, the
1731 * checkpoint must wait for the restart_lsn update before determining the
1732 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1733 * first, subsequent WAL reservations will select positions at or beyond
1734 * the redo pointer of that checkpoint.
1735 */
1736 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1737
1738 /*
1739 * For logical slots log a standby snapshot and start logical decoding at
1740 * exactly that position. That allows the slot to start up more quickly.
1741 * But on a standby we cannot do WAL writes, so just use the replay
1742 * pointer; effectively, an attempt to create a logical slot on standby
1743 * will cause it to wait for an xl_running_xact record to be logged
1744 * independently on the primary, so that a snapshot can be built using the
1745 * record.
1746 *
1747 * None of this is needed (or indeed helpful) for physical slots as
1748 * they'll start replay at the last logged checkpoint anyway. Instead,
1749 * return the location of the last redo LSN, where a base backup has to
1750 * start replay at.
1751 */
1752 if (SlotIsPhysical(slot))
1753 restart_lsn = GetRedoRecPtr();
1754 else if (RecoveryInProgress())
1755 restart_lsn = GetXLogReplayRecPtr(NULL);
1756 else
1757 restart_lsn = GetXLogInsertRecPtr();
1758
1759 SpinLockAcquire(&slot->mutex);
1760 slot->data.restart_lsn = restart_lsn;
1761 SpinLockRelease(&slot->mutex);
1762
1763 /* prevent WAL removal as fast as possible */
1765
1766 /* Checkpoint shouldn't remove the required WAL. */
1768 if (XLogGetLastRemovedSegno() >= segno)
1769 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1770 NameStr(slot->data.name));
1771
1772 LWLockRelease(ReplicationSlotAllocationLock);
1773
1774 if (!RecoveryInProgress() && SlotIsLogical(slot))
1775 {
1776 XLogRecPtr flushptr;
1777
1778 /* make sure we have enough information to start */
1779 flushptr = LogStandbySnapshot(InvalidOid);
1780
1781 /* and make sure it's fsynced to disk */
1782 XLogFlush(flushptr);
1783 }
1784}
1785
1786/*
1787 * Report that replication slot needs to be invalidated
1788 */
1789static void
1791 bool terminating,
1792 int pid,
1793 NameData slotname,
1794 XLogRecPtr restart_lsn,
1795 XLogRecPtr oldestLSN,
1796 TransactionId snapshotConflictHorizon,
1797 long slot_idle_seconds)
1798{
1799 StringInfoData err_detail;
1800 StringInfoData err_hint;
1801
1802 initStringInfo(&err_detail);
1803 initStringInfo(&err_hint);
1804
1805 switch (cause)
1806 {
1808 {
1809 uint64 ex = oldestLSN - restart_lsn;
1810
1811 appendStringInfo(&err_detail,
1812 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1813 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1814 ex),
1815 LSN_FORMAT_ARGS(restart_lsn),
1816 ex);
1817 /* translator: %s is a GUC variable name */
1818 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1819 "max_slot_wal_keep_size");
1820 break;
1821 }
1822 case RS_INVAL_HORIZON:
1823 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1824 snapshotConflictHorizon);
1825 break;
1826
1827 case RS_INVAL_WAL_LEVEL:
1828 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1829 break;
1830
1832 {
1833 /* translator: %s is a GUC variable name */
1834 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1835 slot_idle_seconds, "idle_replication_slot_timeout",
1837 /* translator: %s is a GUC variable name */
1838 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1839 "idle_replication_slot_timeout");
1840 break;
1841 }
1842 case RS_INVAL_NONE:
1844 }
1845
1846 ereport(LOG,
1847 terminating ?
1848 errmsg("terminating process %d to release replication slot \"%s\"",
1849 pid, NameStr(slotname)) :
1850 errmsg("invalidating obsolete replication slot \"%s\"",
1851 NameStr(slotname)),
1852 errdetail_internal("%s", err_detail.data),
1853 err_hint.len ? errhint("%s", err_hint.data) : 0);
1854
1855 pfree(err_detail.data);
1856 pfree(err_hint.data);
1857}
1858
1859/*
1860 * Can we invalidate an idle replication slot?
1861 *
1862 * Idle timeout invalidation is allowed only when:
1863 *
1864 * 1. Idle timeout is set
1865 * 2. Slot has reserved WAL
1866 * 3. Slot is inactive
1867 * 4. The slot is not being synced from the primary while the server is in
1868 * recovery. This is because synced slots are always considered to be
1869 * inactive because they don't perform logical decoding to produce changes.
1870 */
1871static inline bool
1873{
1876 s->inactive_since > 0 &&
1877 !(RecoveryInProgress() && s->data.synced));
1878}
1879
1880/*
1881 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1882 * becomes invalid among the given possible causes.
1883 *
1884 * This function sequentially checks all possible invalidation causes and
1885 * returns the first one for which the slot is eligible for invalidation.
1886 */
1889 XLogRecPtr oldestLSN, Oid dboid,
1890 TransactionId snapshotConflictHorizon,
1891 TimestampTz *inactive_since, TimestampTz now)
1892{
1893 Assert(possible_causes != RS_INVAL_NONE);
1894
1895 if (possible_causes & RS_INVAL_WAL_REMOVED)
1896 {
1897 XLogRecPtr restart_lsn = s->data.restart_lsn;
1898
1899 if (XLogRecPtrIsValid(restart_lsn) &&
1900 restart_lsn < oldestLSN)
1901 return RS_INVAL_WAL_REMOVED;
1902 }
1903
1904 if (possible_causes & RS_INVAL_HORIZON)
1905 {
1906 /* invalid DB oid signals a shared relation */
1907 if (SlotIsLogical(s) &&
1908 (dboid == InvalidOid || dboid == s->data.database))
1909 {
1910 TransactionId effective_xmin = s->effective_xmin;
1911 TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1912
1913 if (TransactionIdIsValid(effective_xmin) &&
1914 TransactionIdPrecedesOrEquals(effective_xmin,
1915 snapshotConflictHorizon))
1916 return RS_INVAL_HORIZON;
1917 else if (TransactionIdIsValid(catalog_effective_xmin) &&
1918 TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1919 snapshotConflictHorizon))
1920 return RS_INVAL_HORIZON;
1921 }
1922 }
1923
1924 if (possible_causes & RS_INVAL_WAL_LEVEL)
1925 {
1926 if (SlotIsLogical(s))
1927 return RS_INVAL_WAL_LEVEL;
1928 }
1929
1930 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1931 {
1932 Assert(now > 0);
1933
1934 if (CanInvalidateIdleSlot(s))
1935 {
1936 /*
1937 * Simulate the invalidation due to idle_timeout to test the
1938 * timeout behavior promptly, without waiting for it to trigger
1939 * naturally.
1940 */
1941#ifdef USE_INJECTION_POINTS
1942 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1943 {
1944 *inactive_since = 0; /* since the beginning of time */
1945 return RS_INVAL_IDLE_TIMEOUT;
1946 }
1947#endif
1948
1949 /*
1950 * Check if the slot needs to be invalidated due to
1951 * idle_replication_slot_timeout GUC.
1952 */
1955 {
1956 *inactive_since = s->inactive_since;
1957 return RS_INVAL_IDLE_TIMEOUT;
1958 }
1959 }
1960 }
1961
1962 return RS_INVAL_NONE;
1963}
1964
1965/*
1966 * Helper for InvalidateObsoleteReplicationSlots
1967 *
1968 * Acquires the given slot and mark it invalid, if necessary and possible.
1969 *
1970 * Returns true if the slot was invalidated.
1971 *
1972 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1973 * interim (and in that case we're not holding the lock at return, otherwise
1974 * we are).
1975 *
1976 * This is inherently racy, because we release the LWLock
1977 * for syscalls, so caller must restart if we return true.
1978 */
1979static bool
1981 ReplicationSlot *s,
1982 XLogRecPtr oldestLSN,
1983 Oid dboid, TransactionId snapshotConflictHorizon,
1984 bool *released_lock_out)
1985{
1986 int last_signaled_pid = 0;
1987 bool released_lock = false;
1988 bool invalidated = false;
1989 TimestampTz inactive_since = 0;
1990
1991 for (;;)
1992 {
1993 XLogRecPtr restart_lsn;
1994 NameData slotname;
1995 ProcNumber active_proc;
1996 int active_pid = 0;
1998 TimestampTz now = 0;
1999 long slot_idle_secs = 0;
2000
2001 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
2002
2003 if (!s->in_use)
2004 {
2005 if (released_lock)
2006 LWLockRelease(ReplicationSlotControlLock);
2007 break;
2008 }
2009
2010 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
2011 {
2012 /*
2013 * Assign the current time here to avoid system call overhead
2014 * while holding the spinlock in subsequent code.
2015 */
2017 }
2018
2019 /*
2020 * Check if the slot needs to be invalidated. If it needs to be
2021 * invalidated, and is not currently acquired, acquire it and mark it
2022 * as having been invalidated. We do this with the spinlock held to
2023 * avoid race conditions -- for example the restart_lsn could move
2024 * forward, or the slot could be dropped.
2025 */
2027
2028 restart_lsn = s->data.restart_lsn;
2029
2030 /* we do nothing if the slot is already invalid */
2031 if (s->data.invalidated == RS_INVAL_NONE)
2032 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2033 s, oldestLSN,
2034 dboid,
2035 snapshotConflictHorizon,
2036 &inactive_since,
2037 now);
2038
2039 /* if there's no invalidation, we're done */
2040 if (invalidation_cause == RS_INVAL_NONE)
2041 {
2043 if (released_lock)
2044 LWLockRelease(ReplicationSlotControlLock);
2045 break;
2046 }
2047
2048 slotname = s->data.name;
2049 active_proc = s->active_proc;
2050
2051 /*
2052 * If the slot can be acquired, do so and mark it invalidated
2053 * immediately. Otherwise we'll signal the owning process, below, and
2054 * retry.
2055 *
2056 * Note: Unlike other slot attributes, slot's inactive_since can't be
2057 * changed until the acquired slot is released or the owning process
2058 * is terminated. So, the inactive slot can only be invalidated
2059 * immediately without being terminated.
2060 */
2061 if (active_proc == INVALID_PROC_NUMBER)
2062 {
2065 s->data.invalidated = invalidation_cause;
2066
2067 /*
2068 * XXX: We should consider not overwriting restart_lsn and instead
2069 * just rely on .invalidated.
2070 */
2071 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2072 {
2075 }
2076
2077 /* Let caller know */
2078 invalidated = true;
2079 }
2080 else
2081 {
2082 active_pid = GetPGProcByNumber(active_proc)->pid;
2083 Assert(active_pid != 0);
2084 }
2085
2087
2088 /*
2089 * Calculate the idle time duration of the slot if slot is marked
2090 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2091 */
2092 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2093 {
2094 int slot_idle_usecs;
2095
2096 TimestampDifference(inactive_since, now, &slot_idle_secs,
2097 &slot_idle_usecs);
2098 }
2099
2100 if (active_proc != INVALID_PROC_NUMBER)
2101 {
2102 /*
2103 * Prepare the sleep on the slot's condition variable before
2104 * releasing the lock, to close a possible race condition if the
2105 * slot is released before the sleep below.
2106 */
2108
2109 LWLockRelease(ReplicationSlotControlLock);
2110 released_lock = true;
2111
2112 /*
2113 * Signal to terminate the process that owns the slot, if we
2114 * haven't already signalled it. (Avoidance of repeated
2115 * signalling is the only reason for there to be a loop in this
2116 * routine; otherwise we could rely on caller's restart loop.)
2117 *
2118 * There is the race condition that other process may own the slot
2119 * after its current owner process is terminated and before this
2120 * process owns it. To handle that, we signal only if the PID of
2121 * the owning process has changed from the previous time. (This
2122 * logic assumes that the same PID is not reused very quickly.)
2123 */
2124 if (last_signaled_pid != active_pid)
2125 {
2126 ReportSlotInvalidation(invalidation_cause, true, active_pid,
2127 slotname, restart_lsn,
2128 oldestLSN, snapshotConflictHorizon,
2129 slot_idle_secs);
2130
2131 if (MyBackendType == B_STARTUP)
2132 (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2133 active_pid,
2135 else
2136 (void) kill(active_pid, SIGTERM);
2137
2138 last_signaled_pid = active_pid;
2139 }
2140
2141 /* Wait until the slot is released. */
2143 WAIT_EVENT_REPLICATION_SLOT_DROP);
2144
2145 /*
2146 * Re-acquire lock and start over; we expect to invalidate the
2147 * slot next time (unless another process acquires the slot in the
2148 * meantime).
2149 *
2150 * Note: It is possible for a slot to advance its restart_lsn or
2151 * xmin values sufficiently between when we release the mutex and
2152 * when we recheck, moving from a conflicting state to a non
2153 * conflicting state. This is intentional and safe: if the slot
2154 * has caught up while we're busy here, the resources we were
2155 * concerned about (WAL segments or tuples) have not yet been
2156 * removed, and there's no reason to invalidate the slot.
2157 */
2158 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2159 continue;
2160 }
2161 else
2162 {
2163 /*
2164 * We hold the slot now and have already invalidated it; flush it
2165 * to ensure that state persists.
2166 *
2167 * Don't want to hold ReplicationSlotControlLock across file
2168 * system operations, so release it now but be sure to tell caller
2169 * to restart from scratch.
2170 */
2171 LWLockRelease(ReplicationSlotControlLock);
2172 released_lock = true;
2173
2174 /* Make sure the invalidated state persists across server restart */
2178
2179 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2180 slotname, restart_lsn,
2181 oldestLSN, snapshotConflictHorizon,
2182 slot_idle_secs);
2183
2184 /* done with this slot for now */
2185 break;
2186 }
2187 }
2188
2189 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2190
2191 *released_lock_out = released_lock;
2192 return invalidated;
2193}
2194
2195/*
2196 * Invalidate slots that require resources about to be removed.
2197 *
2198 * Returns true when any slot have got invalidated.
2199 *
2200 * Whether a slot needs to be invalidated depends on the invalidation cause.
2201 * A slot is invalidated if it:
2202 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2203 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2204 * db; dboid may be InvalidOid for shared relations
2205 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2206 * logical.
2207 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2208 * "idle_replication_slot_timeout" duration.
2209 *
2210 * Note: This function attempts to invalidate the slot for multiple possible
2211 * causes in a single pass, minimizing redundant iterations. The "cause"
2212 * parameter can be a MASK representing one or more of the defined causes.
2213 *
2214 * If it invalidates the last logical slot in the cluster, it requests to
2215 * disable logical decoding.
2216 *
2217 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2218 */
2219bool
2221 XLogSegNo oldestSegno, Oid dboid,
2222 TransactionId snapshotConflictHorizon)
2223{
2224 XLogRecPtr oldestLSN;
2225 bool invalidated = false;
2226 bool invalidated_logical = false;
2227 bool found_valid_logicalslot;
2228
2229 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2230 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2231 Assert(possible_causes != RS_INVAL_NONE);
2232
2234 return invalidated;
2235
2236 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2237
2238restart:
2239 found_valid_logicalslot = false;
2240 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2242 {
2244 bool released_lock = false;
2245
2246 if (!s->in_use)
2247 continue;
2248
2249 /* Prevent invalidation of logical slots during binary upgrade */
2251 {
2253 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2255
2256 continue;
2257 }
2258
2259 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2260 dboid, snapshotConflictHorizon,
2261 &released_lock))
2262 {
2263 Assert(released_lock);
2264
2265 /* Remember we have invalidated a physical or logical slot */
2266 invalidated = true;
2267
2268 /*
2269 * Additionally, remember we have invalidated a logical slot as we
2270 * can request disabling logical decoding later.
2271 */
2272 if (SlotIsLogical(s))
2273 invalidated_logical = true;
2274 }
2275 else
2276 {
2277 /*
2278 * We need to check if the slot is invalidated here since
2279 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2280 * is already invalidated.
2281 */
2283 found_valid_logicalslot |=
2286 }
2287
2288 /* if the lock was released, start from scratch */
2289 if (released_lock)
2290 goto restart;
2291 }
2292 LWLockRelease(ReplicationSlotControlLock);
2293
2294 /*
2295 * If any slots have been invalidated, recalculate the resource limits.
2296 */
2297 if (invalidated)
2298 {
2301 }
2302
2303 /*
2304 * Request the checkpointer to disable logical decoding if no valid
2305 * logical slots remain. If called by the checkpointer during a
2306 * checkpoint, only the request is initiated; actual deactivation is
2307 * deferred until after the checkpoint completes.
2308 */
2309 if (invalidated_logical && !found_valid_logicalslot)
2311
2312 return invalidated;
2313}
2314
2315/*
2316 * Flush all replication slots to disk.
2317 *
2318 * It is convenient to flush dirty replication slots at the time of checkpoint.
2319 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2320 * for which the confirmed_flush LSN has been updated since the last time it
2321 * was saved and flush them.
2322 */
2323void
2324CheckPointReplicationSlots(bool is_shutdown)
2325{
2326 int i;
2327 bool last_saved_restart_lsn_updated = false;
2328
2329 elog(DEBUG1, "performing replication slot checkpoint");
2330
2331 /*
2332 * Prevent any slot from being created/dropped while we're active. As we
2333 * explicitly do *not* want to block iterating over replication_slots or
2334 * acquiring a slot we cannot take the control lock - but that's OK,
2335 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2336 * enough to guarantee that nobody can change the in_use bits on us.
2337 *
2338 * Additionally, acquiring the Allocation lock is necessary to serialize
2339 * the slot flush process with concurrent slot WAL reservation. This
2340 * ensures that the WAL position being reserved is either flushed to disk
2341 * or is beyond or equal to the redo pointer of the current checkpoint
2342 * (See ReplicationSlotReserveWal for details).
2343 */
2344 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2345
2347 {
2349 char path[MAXPGPATH];
2350
2351 if (!s->in_use)
2352 continue;
2353
2354 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2355 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2356
2357 /*
2358 * Slot's data is not flushed each time the confirmed_flush LSN is
2359 * updated as that could lead to frequent writes. However, we decide
2360 * to force a flush of all logical slot's data at the time of shutdown
2361 * if the confirmed_flush LSN is changed since we last flushed it to
2362 * disk. This helps in avoiding an unnecessary retreat of the
2363 * confirmed_flush LSN after restart.
2364 */
2365 if (is_shutdown && SlotIsLogical(s))
2366 {
2368
2369 if (s->data.invalidated == RS_INVAL_NONE &&
2371 {
2372 s->just_dirtied = true;
2373 s->dirty = true;
2374 }
2376 }
2377
2378 /*
2379 * Track if we're going to update slot's last_saved_restart_lsn. We
2380 * need this to know if we need to recompute the required LSN.
2381 */
2383 last_saved_restart_lsn_updated = true;
2384
2385 SaveSlotToPath(s, path, LOG);
2386 }
2387 LWLockRelease(ReplicationSlotAllocationLock);
2388
2389 /*
2390 * Recompute the required LSN if SaveSlotToPath() updated
2391 * last_saved_restart_lsn for any slot.
2392 */
2393 if (last_saved_restart_lsn_updated)
2395}
2396
2397/*
2398 * Load all replication slots from disk into memory at server startup. This
2399 * needs to be run before we start crash recovery.
2400 */
2401void
2403{
2404 DIR *replication_dir;
2405 struct dirent *replication_de;
2406
2407 elog(DEBUG1, "starting up replication slots");
2408
2409 /* restore all slots by iterating over all on-disk entries */
2410 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2411 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2412 {
2413 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2414 PGFileType de_type;
2415
2416 if (strcmp(replication_de->d_name, ".") == 0 ||
2417 strcmp(replication_de->d_name, "..") == 0)
2418 continue;
2419
2420 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2421 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2422
2423 /* we're only creating directories here, skip if it's not our's */
2424 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2425 continue;
2426
2427 /* we crashed while a slot was being setup or deleted, clean up */
2428 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2429 {
2430 if (!rmtree(path, true))
2431 {
2433 (errmsg("could not remove directory \"%s\"",
2434 path)));
2435 continue;
2436 }
2438 continue;
2439 }
2440
2441 /* looks like a slot in a normal state, restore */
2442 RestoreSlotFromDisk(replication_de->d_name);
2443 }
2444 FreeDir(replication_dir);
2445
2446 /* currently no slots exist, we're done. */
2448 return;
2449
2450 /* Now that we have recovered all the data, compute replication xmin */
2453}
2454
2455/* ----
2456 * Manipulation of on-disk state of replication slots
2457 *
2458 * NB: none of the routines below should take any notice whether a slot is the
2459 * current one or not, that's all handled a layer above.
2460 * ----
2461 */
2462static void
2464{
2465 char tmppath[MAXPGPATH];
2466 char path[MAXPGPATH];
2467 struct stat st;
2468
2469 /*
2470 * No need to take out the io_in_progress_lock, nobody else can see this
2471 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2472 * takes out the lock, if we'd take the lock here, we'd deadlock.
2473 */
2474
2475 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2476 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2477
2478 /*
2479 * It's just barely possible that some previous effort to create or drop a
2480 * slot with this name left a temp directory lying around. If that seems
2481 * to be the case, try to remove it. If the rmtree() fails, we'll error
2482 * out at the MakePGDirectory() below, so we don't bother checking
2483 * success.
2484 */
2485 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2486 rmtree(tmppath, true);
2487
2488 /* Create and fsync the temporary slot directory. */
2489 if (MakePGDirectory(tmppath) < 0)
2490 ereport(ERROR,
2492 errmsg("could not create directory \"%s\": %m",
2493 tmppath)));
2494 fsync_fname(tmppath, true);
2495
2496 /* Write the actual state file. */
2497 slot->dirty = true; /* signal that we really need to write */
2498 SaveSlotToPath(slot, tmppath, ERROR);
2499
2500 /* Rename the directory into place. */
2501 if (rename(tmppath, path) != 0)
2502 ereport(ERROR,
2504 errmsg("could not rename file \"%s\" to \"%s\": %m",
2505 tmppath, path)));
2506
2507 /*
2508 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2509 * would persist after an OS crash or not - so, force a restart. The
2510 * restart would try to fsync this again till it works.
2511 */
2513
2514 fsync_fname(path, true);
2516
2518}
2519
2520/*
2521 * Shared functionality between saving and creating a replication slot.
2522 */
2523static void
2524SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2525{
2526 char tmppath[MAXPGPATH];
2527 char path[MAXPGPATH];
2528 int fd;
2530 bool was_dirty;
2531
2532 /* first check whether there's something to write out */
2533 SpinLockAcquire(&slot->mutex);
2534 was_dirty = slot->dirty;
2535 slot->just_dirtied = false;
2536 SpinLockRelease(&slot->mutex);
2537
2538 /* and don't do anything if there's nothing to write */
2539 if (!was_dirty)
2540 return;
2541
2543
2544 /* silence valgrind :( */
2545 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2546
2547 sprintf(tmppath, "%s/state.tmp", dir);
2548 sprintf(path, "%s/state", dir);
2549
2550 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2551 if (fd < 0)
2552 {
2553 /*
2554 * If not an ERROR, then release the lock before returning. In case
2555 * of an ERROR, the error recovery path automatically releases the
2556 * lock, but no harm in explicitly releasing even in that case. Note
2557 * that LWLockRelease() could affect errno.
2558 */
2559 int save_errno = errno;
2560
2562 errno = save_errno;
2563 ereport(elevel,
2565 errmsg("could not create file \"%s\": %m",
2566 tmppath)));
2567 return;
2568 }
2569
2570 cp.magic = SLOT_MAGIC;
2572 cp.version = SLOT_VERSION;
2574
2575 SpinLockAcquire(&slot->mutex);
2576
2577 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2578
2579 SpinLockRelease(&slot->mutex);
2580
2584 FIN_CRC32C(cp.checksum);
2585
2586 errno = 0;
2587 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2588 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2589 {
2590 int save_errno = errno;
2591
2594 unlink(tmppath);
2596
2597 /* if write didn't set errno, assume problem is no disk space */
2598 errno = save_errno ? save_errno : ENOSPC;
2599 ereport(elevel,
2601 errmsg("could not write to file \"%s\": %m",
2602 tmppath)));
2603 return;
2604 }
2606
2607 /* fsync the temporary file */
2608 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2609 if (pg_fsync(fd) != 0)
2610 {
2611 int save_errno = errno;
2612
2615 unlink(tmppath);
2617
2618 errno = save_errno;
2619 ereport(elevel,
2621 errmsg("could not fsync file \"%s\": %m",
2622 tmppath)));
2623 return;
2624 }
2626
2627 if (CloseTransientFile(fd) != 0)
2628 {
2629 int save_errno = errno;
2630
2631 unlink(tmppath);
2633
2634 errno = save_errno;
2635 ereport(elevel,
2637 errmsg("could not close file \"%s\": %m",
2638 tmppath)));
2639 return;
2640 }
2641
2642 /* rename to permanent file, fsync file and directory */
2643 if (rename(tmppath, path) != 0)
2644 {
2645 int save_errno = errno;
2646
2647 unlink(tmppath);
2649
2650 errno = save_errno;
2651 ereport(elevel,
2653 errmsg("could not rename file \"%s\" to \"%s\": %m",
2654 tmppath, path)));
2655 return;
2656 }
2657
2658 /*
2659 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2660 */
2662
2663 fsync_fname(path, false);
2664 fsync_fname(dir, true);
2666
2668
2669 /*
2670 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2671 * already and remember the confirmed_flush LSN value.
2672 */
2673 SpinLockAcquire(&slot->mutex);
2674 if (!slot->just_dirtied)
2675 slot->dirty = false;
2678 SpinLockRelease(&slot->mutex);
2679
2681}
2682
2683/*
2684 * Load a single slot from disk into memory.
2685 */
2686static void
2687RestoreSlotFromDisk(const char *name)
2688{
2690 int i;
2691 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2692 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2693 int fd;
2694 bool restored = false;
2695 int readBytes;
2696 pg_crc32c checksum;
2697 TimestampTz now = 0;
2698
2699 /* no need to lock here, no concurrent access allowed yet */
2700
2701 /* delete temp file if it exists */
2702 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2703 sprintf(path, "%s/state.tmp", slotdir);
2704 if (unlink(path) < 0 && errno != ENOENT)
2705 ereport(PANIC,
2707 errmsg("could not remove file \"%s\": %m", path)));
2708
2709 sprintf(path, "%s/state", slotdir);
2710
2711 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2712
2713 /* on some operating systems fsyncing a file requires O_RDWR */
2714 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2715
2716 /*
2717 * We do not need to handle this as we are rename()ing the directory into
2718 * place only after we fsync()ed the state file.
2719 */
2720 if (fd < 0)
2721 ereport(PANIC,
2723 errmsg("could not open file \"%s\": %m", path)));
2724
2725 /*
2726 * Sync state file before we're reading from it. We might have crashed
2727 * while it wasn't synced yet and we shouldn't continue on that basis.
2728 */
2729 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2730 if (pg_fsync(fd) != 0)
2731 ereport(PANIC,
2733 errmsg("could not fsync file \"%s\": %m",
2734 path)));
2736
2737 /* Also sync the parent directory */
2739 fsync_fname(slotdir, true);
2741
2742 /* read part of statefile that's guaranteed to be version independent */
2743 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2744 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2746 if (readBytes != ReplicationSlotOnDiskConstantSize)
2747 {
2748 if (readBytes < 0)
2749 ereport(PANIC,
2751 errmsg("could not read file \"%s\": %m", path)));
2752 else
2753 ereport(PANIC,
2755 errmsg("could not read file \"%s\": read %d of %zu",
2756 path, readBytes,
2758 }
2759
2760 /* verify magic */
2761 if (cp.magic != SLOT_MAGIC)
2762 ereport(PANIC,
2764 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2765 path, cp.magic, SLOT_MAGIC)));
2766
2767 /* verify version */
2768 if (cp.version != SLOT_VERSION)
2769 ereport(PANIC,
2771 errmsg("replication slot file \"%s\" has unsupported version %u",
2772 path, cp.version)));
2773
2774 /* boundary check on length */
2776 ereport(PANIC,
2778 errmsg("replication slot file \"%s\" has corrupted length %u",
2779 path, cp.length)));
2780
2781 /* Now that we know the size, read the entire file */
2782 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2783 readBytes = read(fd,
2784 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2785 cp.length);
2787 if (readBytes != cp.length)
2788 {
2789 if (readBytes < 0)
2790 ereport(PANIC,
2792 errmsg("could not read file \"%s\": %m", path)));
2793 else
2794 ereport(PANIC,
2796 errmsg("could not read file \"%s\": read %d of %zu",
2797 path, readBytes, (Size) cp.length)));
2798 }
2799
2800 if (CloseTransientFile(fd) != 0)
2801 ereport(PANIC,
2803 errmsg("could not close file \"%s\": %m", path)));
2804
2805 /* now verify the CRC */
2806 INIT_CRC32C(checksum);
2807 COMP_CRC32C(checksum,
2810 FIN_CRC32C(checksum);
2811
2812 if (!EQ_CRC32C(checksum, cp.checksum))
2813 ereport(PANIC,
2814 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2815 path, checksum, cp.checksum)));
2816
2817 /*
2818 * If we crashed with an ephemeral slot active, don't restore but delete
2819 * it.
2820 */
2822 {
2823 if (!rmtree(slotdir, true))
2824 {
2826 (errmsg("could not remove directory \"%s\"",
2827 slotdir)));
2828 }
2830 return;
2831 }
2832
2833 /*
2834 * Verify that requirements for the specific slot type are met. That's
2835 * important because if these aren't met we're not guaranteed to retain
2836 * all the necessary resources for the slot.
2837 *
2838 * NB: We have to do so *after* the above checks for ephemeral slots,
2839 * because otherwise a slot that shouldn't exist anymore could prevent
2840 * restarts.
2841 *
2842 * NB: Changing the requirements here also requires adapting
2843 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2844 */
2845 if (cp.slotdata.database != InvalidOid)
2846 {
2848 ereport(FATAL,
2849 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2850 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2851 NameStr(cp.slotdata.name)),
2852 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2853
2854 /*
2855 * In standby mode, the hot standby must be enabled. This check is
2856 * necessary to ensure logical slots are invalidated when they become
2857 * incompatible due to insufficient wal_level. Otherwise, if the
2858 * primary reduces effective_wal_level < logical while hot standby is
2859 * disabled, primary disable logical decoding while hot standby is
2860 * disabled, logical slots would remain valid even after promotion.
2861 */
2863 ereport(FATAL,
2864 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2865 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2866 NameStr(cp.slotdata.name)),
2867 errhint("Change \"hot_standby\" to be \"on\".")));
2868 }
2869 else if (wal_level < WAL_LEVEL_REPLICA)
2870 ereport(FATAL,
2871 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2872 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2873 NameStr(cp.slotdata.name)),
2874 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2875
2876 /*
2877 * Nothing can be active yet, don't lock anything. Note we iterate up to
2878 * max_replication_slots instead of adding max_repack_replication_slots as
2879 * in all other places, because we must enforce the GUC value in case
2880 * there were more slots before the shutdown than what it is set up to
2881 * now.
2882 */
2883 for (i = 0; i < max_replication_slots; i++)
2884 {
2885 ReplicationSlot *slot;
2886
2888
2889 if (slot->in_use)
2890 continue;
2891
2892 /* restore the entire set of persistent data */
2893 memcpy(&slot->data, &cp.slotdata,
2895
2896 /* initialize in memory state */
2897 slot->effective_xmin = cp.slotdata.xmin;
2901
2906
2907 slot->in_use = true;
2909
2910 /*
2911 * Set the time since the slot has become inactive after loading the
2912 * slot from the disk into memory. Whoever acquires the slot i.e.
2913 * makes the slot active will reset it. Use the same inactive_since
2914 * time for all the slots.
2915 */
2916 if (now == 0)
2918
2920
2921 restored = true;
2922 break;
2923 }
2924
2925 if (!restored)
2926 ereport(FATAL,
2927 (errmsg("too many replication slots active before shutdown"),
2928 errhint("Increase \"max_replication_slots\" and try again.")));
2929}
2930
2931/*
2932 * Maps an invalidation reason for a replication slot to
2933 * ReplicationSlotInvalidationCause.
2934 */
2936GetSlotInvalidationCause(const char *cause_name)
2937{
2938 Assert(cause_name);
2939
2940 /* Search lookup table for the cause having this name */
2941 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2942 {
2943 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2945 }
2946
2947 Assert(false);
2948 return RS_INVAL_NONE; /* to keep compiler quiet */
2949}
2950
2951/*
2952 * Maps a ReplicationSlotInvalidationCause to the invalidation
2953 * reason for a replication slot.
2954 */
2955const char *
2957{
2958 /* Search lookup table for the name of this cause */
2959 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2960 {
2961 if (SlotInvalidationCauses[i].cause == cause)
2963 }
2964
2965 Assert(false);
2966 return "none"; /* to keep compiler quiet */
2967}
2968
2969/*
2970 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2971 *
2972 * The rawname will be parsed, and the result will be saved into *elemlist.
2973 */
2974static bool
2975validate_sync_standby_slots(char *rawname, List **elemlist)
2976{
2977 /* Verify syntax and parse string into a list of identifiers */
2978 if (!SplitIdentifierString(rawname, ',', elemlist))
2979 {
2980 GUC_check_errdetail("List syntax is invalid.");
2981 return false;
2982 }
2983
2984 /* Iterate the list to validate each slot name */
2985 foreach_ptr(char, name, *elemlist)
2986 {
2987 int err_code;
2988 char *err_msg = NULL;
2989 char *err_hint = NULL;
2990
2991 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2992 &err_msg, &err_hint))
2993 {
2994 GUC_check_errcode(err_code);
2995 GUC_check_errdetail("%s", err_msg);
2996 if (err_hint != NULL)
2997 GUC_check_errhint("%s", err_hint);
2998 return false;
2999 }
3000 }
3001
3002 return true;
3003}
3004
3005/*
3006 * GUC check_hook for synchronized_standby_slots
3007 */
3008bool
3010{
3011 char *rawname;
3012 char *ptr;
3013 List *elemlist;
3014 int size;
3015 bool ok;
3017
3018 if ((*newval)[0] == '\0')
3019 return true;
3020
3021 /* Need a modifiable copy of the GUC string */
3022 rawname = pstrdup(*newval);
3023
3024 /* Now verify if the specified slots exist and have correct type */
3025 ok = validate_sync_standby_slots(rawname, &elemlist);
3026
3027 if (!ok || elemlist == NIL)
3028 {
3029 pfree(rawname);
3030 list_free(elemlist);
3031 return ok;
3032 }
3033
3034 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3035 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3036 foreach_ptr(char, slot_name, elemlist)
3037 size += strlen(slot_name) + 1;
3038
3039 /* GUC extra value must be guc_malloc'd, not palloc'd */
3040 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3041 if (!config)
3042 return false;
3043
3044 /* Transform the data into SyncStandbySlotsConfigData */
3045 config->nslotnames = list_length(elemlist);
3046
3047 ptr = config->slot_names;
3048 foreach_ptr(char, slot_name, elemlist)
3049 {
3050 strcpy(ptr, slot_name);
3051 ptr += strlen(slot_name) + 1;
3052 }
3053
3054 *extra = config;
3055
3056 pfree(rawname);
3057 list_free(elemlist);
3058 return true;
3059}
3060
3061/*
3062 * GUC assign_hook for synchronized_standby_slots
3063 */
3064void
3065assign_synchronized_standby_slots(const char *newval, void *extra)
3066{
3067 /*
3068 * The standby slots may have changed, so we must recompute the oldest
3069 * LSN.
3070 */
3072
3074}
3075
3076/*
3077 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3078 */
3079bool
3080SlotExistsInSyncStandbySlots(const char *slot_name)
3081{
3082 const char *standby_slot_name;
3083
3084 /* Return false if there is no value in synchronized_standby_slots */
3086 return false;
3087
3088 /*
3089 * XXX: We are not expecting this list to be long so a linear search
3090 * shouldn't hurt but if that turns out not to be true then we can cache
3091 * this information for each WalSender as well.
3092 */
3093 standby_slot_name = synchronized_standby_slots_config->slot_names;
3094 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3095 {
3096 if (strcmp(standby_slot_name, slot_name) == 0)
3097 return true;
3098
3099 standby_slot_name += strlen(standby_slot_name) + 1;
3100 }
3101
3102 return false;
3103}
3104
3105/*
3106 * Return true if the slots specified in synchronized_standby_slots have caught up to
3107 * the given WAL location, false otherwise.
3108 *
3109 * The elevel parameter specifies the error level used for logging messages
3110 * related to slots that do not exist, are invalidated, or are inactive.
3111 */
3112bool
3113StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3114{
3115 const char *name;
3116 int caught_up_slot_num = 0;
3117 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3118
3119 /*
3120 * Don't need to wait for the standbys to catch up if there is no value in
3121 * synchronized_standby_slots.
3122 */
3124 return true;
3125
3126 /*
3127 * Don't need to wait for the standbys to catch up if we are on a standby
3128 * server, since we do not support syncing slots to cascading standbys.
3129 */
3130 if (RecoveryInProgress())
3131 return true;
3132
3133 /*
3134 * Don't need to wait for the standbys to catch up if they are already
3135 * beyond the specified WAL location.
3136 */
3138 ss_oldest_flush_lsn >= wait_for_lsn)
3139 return true;
3140
3141 /*
3142 * To prevent concurrent slot dropping and creation while filtering the
3143 * slots, take the ReplicationSlotControlLock outside of the loop.
3144 */
3145 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3146
3148 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3149 {
3150 XLogRecPtr restart_lsn;
3151 bool invalidated;
3152 bool inactive;
3153 ReplicationSlot *slot;
3154
3155 slot = SearchNamedReplicationSlot(name, false);
3156
3157 /*
3158 * If a slot name provided in synchronized_standby_slots does not
3159 * exist, report a message and exit the loop.
3160 */
3161 if (!slot)
3162 {
3163 ereport(elevel,
3164 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3165 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3166 name, "synchronized_standby_slots"),
3167 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3168 name),
3169 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3170 name, "synchronized_standby_slots"));
3171 break;
3172 }
3173
3174 /* Same as above: if a slot is not physical, exit the loop. */
3175 if (SlotIsLogical(slot))
3176 {
3177 ereport(elevel,
3178 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3179 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3180 name, "synchronized_standby_slots"),
3181 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3182 name),
3183 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3184 name, "synchronized_standby_slots"));
3185 break;
3186 }
3187
3188 SpinLockAcquire(&slot->mutex);
3189 restart_lsn = slot->data.restart_lsn;
3190 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3191 inactive = slot->active_proc == INVALID_PROC_NUMBER;
3192 SpinLockRelease(&slot->mutex);
3193
3194 if (invalidated)
3195 {
3196 /* Specified physical slot has been invalidated */
3197 ereport(elevel,
3198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3199 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3200 name, "synchronized_standby_slots"),
3201 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3202 name),
3203 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3204 name, "synchronized_standby_slots"));
3205 break;
3206 }
3207
3208 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3209 {
3210 /* Log a message if no active_pid for this physical slot */
3211 if (inactive)
3212 ereport(elevel,
3213 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3214 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3215 name, "synchronized_standby_slots"),
3216 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3217 name),
3218 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3219 name, "synchronized_standby_slots"));
3220
3221 /* Continue if the current slot hasn't caught up. */
3222 break;
3223 }
3224
3225 Assert(restart_lsn >= wait_for_lsn);
3226
3227 if (!XLogRecPtrIsValid(min_restart_lsn) ||
3228 min_restart_lsn > restart_lsn)
3229 min_restart_lsn = restart_lsn;
3230
3231 caught_up_slot_num++;
3232
3233 name += strlen(name) + 1;
3234 }
3235
3236 LWLockRelease(ReplicationSlotControlLock);
3237
3238 /*
3239 * Return false if not all the standbys have caught up to the specified
3240 * WAL location.
3241 */
3242 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3243 return false;
3244
3245 /* The ss_oldest_flush_lsn must not retreat. */
3247 min_restart_lsn >= ss_oldest_flush_lsn);
3248
3249 ss_oldest_flush_lsn = min_restart_lsn;
3250
3251 return true;
3252}
3253
3254/*
3255 * Wait for physical standbys to confirm receiving the given lsn.
3256 *
3257 * Used by logical decoding SQL functions. It waits for physical standbys
3258 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3259 */
3260void
3262{
3263 /*
3264 * Don't need to wait for the standby to catch up if the current acquired
3265 * slot is not a logical failover slot, or there is no value in
3266 * synchronized_standby_slots.
3267 */
3269 return;
3270
3272
3273 for (;;)
3274 {
3276
3278 {
3279 ConfigReloadPending = false;
3281 }
3282
3283 /* Exit if done waiting for every slot. */
3284 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3285 break;
3286
3287 /*
3288 * Wait for the slots in the synchronized_standby_slots to catch up,
3289 * but use a timeout (1s) so we can also check if the
3290 * synchronized_standby_slots has been changed.
3291 */
3293 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3294 }
3295
3297}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1715
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1789
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define NameStr(name)
Definition c.h:835
#define ngettext(s, p, n)
Definition c.h:1270
#define Assert(condition)
Definition c.h:943
#define PG_BINARY
Definition c.h:1374
uint64_t uint64
Definition c.h:625
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1322
int errcode_for_file_access(void)
Definition elog.c:897
int errcode(int sqlerrcode)
Definition elog.c:874
#define _(x)
Definition elog.c:95
#define LOG
Definition elog.h:32
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:42
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
int MakePGDirectory(const char *directoryName)
Definition fd.c:3963
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:123
ProcNumber MyProcNumber
Definition globals.c:92
bool IsUnderPostmaster
Definition globals.c:122
Oid MyDatabaseId
Definition globals.c:96
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6666
void * guc_malloc(int elevel, size_t size)
Definition guc.c:637
#define newval
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:511
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1587
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:431
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1885
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
@ B_STARTUP
Definition miscadmin.h:377
#define END_CRIT_SECTION()
Definition miscadmin.h:154
Oid GetUserId(void)
Definition miscinit.c:470
BackendType MyBackendType
Definition miscinit.c:65
bool has_rolreplication(Oid roleid)
Definition miscinit.c:689
void namestrcpy(Name name, const char *str)
Definition name.c:233
static char * errmsg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:173
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:178
const void size_t len
const void * data
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
#define GetPGProcByNumber(n)
Definition proc.h:504
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3963
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
Definition procarray.c:3467
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:581
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1980
static void ReplicationSlotsShmemInit(void *arg)
Definition slot.c:221
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:115
char * synchronized_standby_slots
Definition slot.c:176
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3065
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:137
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2324
int idle_replication_slot_timeout_secs
Definition slot.c:170
void ReplicationSlotDropAcquired(void)
Definition slot.c:1042
void ReplicationSlotMarkDirty(void)
Definition slot.c:1184
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1888
void ReplicationSlotReserveWal(void)
Definition slot.c:1711
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1457
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:185
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:310
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1518
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:134
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1378
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2936
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1226
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2687
void ReplicationSlotPersist(void)
Definition slot.c:1201
bool CheckLogicalSlotExists(void)
Definition slot.c:1623
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1790
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2524
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:920
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3080
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2975
void ReplicationSlotSave(void)
Definition slot.c:1166
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2463
#define ReplicationSlotOnDiskV2Size
Definition slot.c:140
void CheckSlotPermissions(void)
Definition slot.c:1694
bool ReplicationSlotName(int index, Name name)
Definition slot.c:598
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:3009
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:960
void ReplicationSlotRelease(void)
Definition slot.c:769
int max_replication_slots
Definition slot.c:161
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:147
#define SLOT_VERSION
Definition slot.c:144
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3261
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3113
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1308
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:868
int max_repack_replication_slots
Definition slot.c:163
void ReplicationSlotInitialize(void)
Definition slot.c:240
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1059
void StartupReplicationSlots(void)
Definition slot.c:2402
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1872
void CheckSlotRequirements(bool repack)
Definition slot.c:1665
#define SLOT_MAGIC
Definition slot.c:143
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2220
static void ReplicationSlotsShmemRequest(void *arg)
Definition slot.c:200
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:179
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:131
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2956
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:249
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:360
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:287
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1897
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:71
PROC_HDR * ProcGlobal
Definition proc.c:74
XLogRecPtr LogStandbySnapshot(Oid dbid)
Definition standby.c:1303
@ RECOVERY_CONFLICT_LOGICALSLOT
Definition standby.h:46
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotPersistentData slotdata
Definition slot.c:85
pg_crc32c checksum
Definition slot.c:74
TransactionId catalog_xmin
Definition slot.h:122
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
const char * cause_name
Definition slot.c:112
ReplicationSlotInvalidationCause cause
Definition slot.c:111
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:103
ConditionVariable wal_confirm_rcv_cv
char d_name[MAX_PATH]
Definition dirent.h:15
Definition type.h:96
Definition c.h:830
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2867
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
const char * name
bool am_walsender
Definition walsender.c:135
bool log_replication_commands
Definition walsender.c:150
WalSndCtlData * WalSndCtl
Definition walsender.c:121
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6830
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3813
bool EnableHotStandby
Definition xlog.c:128
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6933
int wal_level
Definition xlog.c:138
int wal_segment_size
Definition xlog.c:150
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2687
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:10108
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

◆ ReplicationSlotOnDiskConstantSize

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)

Definition at line 131 of file slot.c.

◆ ReplicationSlotOnDiskNotChecksummedSize

#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)

Definition at line 134 of file slot.c.

◆ ReplicationSlotOnDiskV2Size

#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize

Definition at line 140 of file slot.c.

◆ SLOT_MAGIC

#define SLOT_MAGIC   0x1051CA1 /* format identifier */

Definition at line 143 of file slot.c.

◆ SLOT_VERSION

#define SLOT_VERSION   5 /* version for new files */

Definition at line 144 of file slot.c.

Typedef Documentation

◆ ReplicationSlotOnDisk

◆ SlotInvalidationCauseMap

Function Documentation

◆ assign_synchronized_standby_slots()

void assign_synchronized_standby_slots ( const char *  newval,
void *  extra 
)

Definition at line 3065 of file slot.c.

3066{
3067 /*
3068 * The standby slots may have changed, so we must recompute the oldest
3069 * LSN.
3070 */
3072
3074}

References InvalidXLogRecPtr, ss_oldest_flush_lsn, and synchronized_standby_slots_config.

◆ CanInvalidateIdleSlot()

◆ check_synchronized_standby_slots()

bool check_synchronized_standby_slots ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 3009 of file slot.c.

3010{
3011 char *rawname;
3012 char *ptr;
3013 List *elemlist;
3014 int size;
3015 bool ok;
3017
3018 if ((*newval)[0] == '\0')
3019 return true;
3020
3021 /* Need a modifiable copy of the GUC string */
3022 rawname = pstrdup(*newval);
3023
3024 /* Now verify if the specified slots exist and have correct type */
3025 ok = validate_sync_standby_slots(rawname, &elemlist);
3026
3027 if (!ok || elemlist == NIL)
3028 {
3029 pfree(rawname);
3030 list_free(elemlist);
3031 return ok;
3032 }
3033
3034 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3035 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3036 foreach_ptr(char, slot_name, elemlist)
3037 size += strlen(slot_name) + 1;
3038
3039 /* GUC extra value must be guc_malloc'd, not palloc'd */
3040 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3041 if (!config)
3042 return false;
3043
3044 /* Transform the data into SyncStandbySlotsConfigData */
3045 config->nslotnames = list_length(elemlist);
3046
3047 ptr = config->slot_names;
3048 foreach_ptr(char, slot_name, elemlist)
3049 {
3050 strcpy(ptr, slot_name);
3051 ptr += strlen(slot_name) + 1;
3052 }
3053
3054 *extra = config;
3055
3056 pfree(rawname);
3057 list_free(elemlist);
3058 return true;
3059}

References foreach_ptr, guc_malloc(), list_free(), list_length(), LOG, newval, NIL, SyncStandbySlotsConfigData::nslotnames, pfree(), pstrdup(), SyncStandbySlotsConfigData::slot_names, and validate_sync_standby_slots().

◆ CheckLogicalSlotExists()

bool CheckLogicalSlotExists ( void  )

Definition at line 1623 of file slot.c.

1624{
1625 bool found = false;
1626
1628 return false;
1629
1630 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1632 {
1633 ReplicationSlot *s;
1634 bool invalidated;
1635
1637
1638 /* cannot change while ReplicationSlotCtlLock is held */
1639 if (!s->in_use)
1640 continue;
1641
1642 if (SlotIsPhysical(s))
1643 continue;
1644
1646 invalidated = s->data.invalidated != RS_INVAL_NONE;
1648
1649 if (invalidated)
1650 continue;
1651
1652 found = true;
1653 break;
1654 }
1655 LWLockRelease(ReplicationSlotControlLock);
1656
1657 return found;
1658}

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SlotIsPhysical, SpinLockAcquire(), and SpinLockRelease().

Referenced by DisableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2324 of file slot.c.

2325{
2326 int i;
2327 bool last_saved_restart_lsn_updated = false;
2328
2329 elog(DEBUG1, "performing replication slot checkpoint");
2330
2331 /*
2332 * Prevent any slot from being created/dropped while we're active. As we
2333 * explicitly do *not* want to block iterating over replication_slots or
2334 * acquiring a slot we cannot take the control lock - but that's OK,
2335 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2336 * enough to guarantee that nobody can change the in_use bits on us.
2337 *
2338 * Additionally, acquiring the Allocation lock is necessary to serialize
2339 * the slot flush process with concurrent slot WAL reservation. This
2340 * ensures that the WAL position being reserved is either flushed to disk
2341 * or is beyond or equal to the redo pointer of the current checkpoint
2342 * (See ReplicationSlotReserveWal for details).
2343 */
2344 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2345
2347 {
2349 char path[MAXPGPATH];
2350
2351 if (!s->in_use)
2352 continue;
2353
2354 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2355 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2356
2357 /*
2358 * Slot's data is not flushed each time the confirmed_flush LSN is
2359 * updated as that could lead to frequent writes. However, we decide
2360 * to force a flush of all logical slot's data at the time of shutdown
2361 * if the confirmed_flush LSN is changed since we last flushed it to
2362 * disk. This helps in avoiding an unnecessary retreat of the
2363 * confirmed_flush LSN after restart.
2364 */
2365 if (is_shutdown && SlotIsLogical(s))
2366 {
2368
2369 if (s->data.invalidated == RS_INVAL_NONE &&
2371 {
2372 s->just_dirtied = true;
2373 s->dirty = true;
2374 }
2376 }
2377
2378 /*
2379 * Track if we're going to update slot's last_saved_restart_lsn. We
2380 * need this to know if we need to recompute the required LSN.
2381 */
2383 last_saved_restart_lsn_updated = true;
2384
2385 SaveSlotToPath(s, path, LOG);
2386 }
2387 LWLockRelease(ReplicationSlotAllocationLock);
2388
2389 /*
2390 * Recompute the required LSN if SaveSlotToPath() updated
2391 * last_saved_restart_lsn for any slot.
2392 */
2393 if (last_saved_restart_lsn_updated)
2395}

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1694 of file slot.c.

1695{
1697 ereport(ERROR,
1698 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1699 errmsg("permission denied to use replication slots"),
1700 errdetail("Only roles with the %s attribute may use replication slots.",
1701 "REPLICATION")));
1702}

References ereport, errcode(), errdetail(), errmsg, ERROR, GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), pg_sync_replication_slots(), and repack_setup_logical_decoding().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( bool  repack)

Definition at line 1665 of file slot.c.

1666{
1667 /*
1668 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1669 * needs the same check.
1670 */
1671
1672 if (!repack && max_replication_slots == 0)
1673 ereport(ERROR,
1674 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1675 errmsg("replication slots can only be used if \"%s\" > 0",
1676 "max_replication_slots"));
1677
1678 if (repack && max_repack_replication_slots == 0)
1679 ereport(ERROR,
1680 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 errmsg("REPACK can only be used if \"%s\" > 0",
1682 "max_repack_replication_slots"));
1683
1685 ereport(ERROR,
1686 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1687 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1688}

References ereport, errcode(), errmsg, ERROR, max_repack_replication_slots, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ CreateSlotOnDisk()

static void CreateSlotOnDisk ( ReplicationSlot slot)
static

Definition at line 2463 of file slot.c.

2464{
2465 char tmppath[MAXPGPATH];
2466 char path[MAXPGPATH];
2467 struct stat st;
2468
2469 /*
2470 * No need to take out the io_in_progress_lock, nobody else can see this
2471 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2472 * takes out the lock, if we'd take the lock here, we'd deadlock.
2473 */
2474
2475 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2476 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2477
2478 /*
2479 * It's just barely possible that some previous effort to create or drop a
2480 * slot with this name left a temp directory lying around. If that seems
2481 * to be the case, try to remove it. If the rmtree() fails, we'll error
2482 * out at the MakePGDirectory() below, so we don't bother checking
2483 * success.
2484 */
2485 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2486 rmtree(tmppath, true);
2487
2488 /* Create and fsync the temporary slot directory. */
2489 if (MakePGDirectory(tmppath) < 0)
2490 ereport(ERROR,
2492 errmsg("could not create directory \"%s\": %m",
2493 tmppath)));
2494 fsync_fname(tmppath, true);
2495
2496 /* Write the actual state file. */
2497 slot->dirty = true; /* signal that we really need to write */
2498 SaveSlotToPath(slot, tmppath, ERROR);
2499
2500 /* Rename the directory into place. */
2501 if (rename(tmppath, path) != 0)
2502 ereport(ERROR,
2504 errmsg("could not rename file \"%s\" to \"%s\": %m",
2505 tmppath, path)));
2506
2507 /*
2508 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2509 * would persist after an OS crash or not - so, force a restart. The
2510 * restart would try to fsync this again till it works.
2511 */
2513
2514 fsync_fname(path, true);
2516
2518}

References ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, ERROR, fsync_fname(), MakePGDirectory(), MAXPGPATH, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, rmtree(), S_ISDIR, SaveSlotToPath(), sprintf, stat::st_mode, START_CRIT_SECTION, and stat.

Referenced by ReplicationSlotCreate().

◆ DetermineSlotInvalidationCause()

static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
TimestampTz inactive_since,
TimestampTz  now 
)
static

Definition at line 1888 of file slot.c.

1892{
1893 Assert(possible_causes != RS_INVAL_NONE);
1894
1895 if (possible_causes & RS_INVAL_WAL_REMOVED)
1896 {
1897 XLogRecPtr restart_lsn = s->data.restart_lsn;
1898
1899 if (XLogRecPtrIsValid(restart_lsn) &&
1900 restart_lsn < oldestLSN)
1901 return RS_INVAL_WAL_REMOVED;
1902 }
1903
1904 if (possible_causes & RS_INVAL_HORIZON)
1905 {
1906 /* invalid DB oid signals a shared relation */
1907 if (SlotIsLogical(s) &&
1908 (dboid == InvalidOid || dboid == s->data.database))
1909 {
1910 TransactionId effective_xmin = s->effective_xmin;
1911 TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1912
1913 if (TransactionIdIsValid(effective_xmin) &&
1914 TransactionIdPrecedesOrEquals(effective_xmin,
1915 snapshotConflictHorizon))
1916 return RS_INVAL_HORIZON;
1917 else if (TransactionIdIsValid(catalog_effective_xmin) &&
1918 TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1919 snapshotConflictHorizon))
1920 return RS_INVAL_HORIZON;
1921 }
1922 }
1923
1924 if (possible_causes & RS_INVAL_WAL_LEVEL)
1925 {
1926 if (SlotIsLogical(s))
1927 return RS_INVAL_WAL_LEVEL;
1928 }
1929
1930 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1931 {
1932 Assert(now > 0);
1933
1934 if (CanInvalidateIdleSlot(s))
1935 {
1936 /*
1937 * Simulate the invalidation due to idle_timeout to test the
1938 * timeout behavior promptly, without waiting for it to trigger
1939 * naturally.
1940 */
1941#ifdef USE_INJECTION_POINTS
1942 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1943 {
1944 *inactive_since = 0; /* since the beginning of time */
1945 return RS_INVAL_IDLE_TIMEOUT;
1946 }
1947#endif
1948
1949 /*
1950 * Check if the slot needs to be invalidated due to
1951 * idle_replication_slot_timeout GUC.
1952 */
1955 {
1956 *inactive_since = s->inactive_since;
1957 return RS_INVAL_IDLE_TIMEOUT;
1958 }
1959 }
1960 }
1961
1962 return RS_INVAL_NONE;
1963}

References Assert, CanInvalidateIdleSlot(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, idle_replication_slot_timeout_secs, ReplicationSlot::inactive_since, InvalidOid, IS_INJECTION_POINT_ATTACHED, now(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_INVAL_WAL_REMOVED, SlotIsLogical, TimestampDifferenceExceedsSeconds(), TransactionIdIsValid, TransactionIdPrecedesOrEquals(), and XLogRecPtrIsValid.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char *  cause_name)

Definition at line 2936 of file slot.c.

2937{
2938 Assert(cause_name);
2939
2940 /* Search lookup table for the cause having this name */
2941 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2942 {
2943 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2945 }
2946
2947 Assert(false);
2948 return RS_INVAL_NONE; /* to keep compiler quiet */
2949}

References Assert, SlotInvalidationCauseMap::cause, i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by fetch_remote_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2956 of file slot.c.

2957{
2958 /* Search lookup table for the name of this cause */
2959 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2960 {
2961 if (SlotInvalidationCauses[i].cause == cause)
2963 }
2964
2965 Assert(false);
2966 return "none"; /* to keep compiler quiet */
2967}

References Assert, SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 2220 of file slot.c.

2223{
2224 XLogRecPtr oldestLSN;
2225 bool invalidated = false;
2226 bool invalidated_logical = false;
2227 bool found_valid_logicalslot;
2228
2229 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2230 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2231 Assert(possible_causes != RS_INVAL_NONE);
2232
2234 return invalidated;
2235
2236 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2237
2238restart:
2239 found_valid_logicalslot = false;
2240 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2242 {
2244 bool released_lock = false;
2245
2246 if (!s->in_use)
2247 continue;
2248
2249 /* Prevent invalidation of logical slots during binary upgrade */
2251 {
2253 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2255
2256 continue;
2257 }
2258
2259 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2260 dboid, snapshotConflictHorizon,
2261 &released_lock))
2262 {
2263 Assert(released_lock);
2264
2265 /* Remember we have invalidated a physical or logical slot */
2266 invalidated = true;
2267
2268 /*
2269 * Additionally, remember we have invalidated a logical slot as we
2270 * can request disabling logical decoding later.
2271 */
2272 if (SlotIsLogical(s))
2273 invalidated_logical = true;
2274 }
2275 else
2276 {
2277 /*
2278 * We need to check if the slot is invalidated here since
2279 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2280 * is already invalidated.
2281 */
2283 found_valid_logicalslot |=
2286 }
2287
2288 /* if the lock was released, start from scratch */
2289 if (released_lock)
2290 goto restart;
2291 }
2292 LWLockRelease(ReplicationSlotControlLock);
2293
2294 /*
2295 * If any slots have been invalidated, recalculate the resource limits.
2296 */
2297 if (invalidated)
2298 {
2301 }
2302
2303 /*
2304 * Request the checkpointer to disable logical decoding if no valid
2305 * logical slots remain. If called by the checkpointer during a
2306 * checkpoint, only the request is initiated; actual deactivation is
2307 * deferred until after the checkpoint completes.
2308 */
2309 if (invalidated_logical && !found_valid_logicalslot)
2311
2312 return invalidated;
2313}

References Assert, ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RequestDisableLogicalDecoding(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ InvalidatePossiblyObsoleteSlot()

static bool InvalidatePossiblyObsoleteSlot ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
bool *  released_lock_out 
)
static

Definition at line 1980 of file slot.c.

1985{
1986 int last_signaled_pid = 0;
1987 bool released_lock = false;
1988 bool invalidated = false;
1989 TimestampTz inactive_since = 0;
1990
1991 for (;;)
1992 {
1993 XLogRecPtr restart_lsn;
1994 NameData slotname;
1995 ProcNumber active_proc;
1996 int active_pid = 0;
1998 TimestampTz now = 0;
1999 long slot_idle_secs = 0;
2000
2001 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
2002
2003 if (!s->in_use)
2004 {
2005 if (released_lock)
2006 LWLockRelease(ReplicationSlotControlLock);
2007 break;
2008 }
2009
2010 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
2011 {
2012 /*
2013 * Assign the current time here to avoid system call overhead
2014 * while holding the spinlock in subsequent code.
2015 */
2017 }
2018
2019 /*
2020 * Check if the slot needs to be invalidated. If it needs to be
2021 * invalidated, and is not currently acquired, acquire it and mark it
2022 * as having been invalidated. We do this with the spinlock held to
2023 * avoid race conditions -- for example the restart_lsn could move
2024 * forward, or the slot could be dropped.
2025 */
2027
2028 restart_lsn = s->data.restart_lsn;
2029
2030 /* we do nothing if the slot is already invalid */
2031 if (s->data.invalidated == RS_INVAL_NONE)
2032 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2033 s, oldestLSN,
2034 dboid,
2035 snapshotConflictHorizon,
2036 &inactive_since,
2037 now);
2038
2039 /* if there's no invalidation, we're done */
2040 if (invalidation_cause == RS_INVAL_NONE)
2041 {
2043 if (released_lock)
2044 LWLockRelease(ReplicationSlotControlLock);
2045 break;
2046 }
2047
2048 slotname = s->data.name;
2049 active_proc = s->active_proc;
2050
2051 /*
2052 * If the slot can be acquired, do so and mark it invalidated
2053 * immediately. Otherwise we'll signal the owning process, below, and
2054 * retry.
2055 *
2056 * Note: Unlike other slot attributes, slot's inactive_since can't be
2057 * changed until the acquired slot is released or the owning process
2058 * is terminated. So, the inactive slot can only be invalidated
2059 * immediately without being terminated.
2060 */
2061 if (active_proc == INVALID_PROC_NUMBER)
2062 {
2065 s->data.invalidated = invalidation_cause;
2066
2067 /*
2068 * XXX: We should consider not overwriting restart_lsn and instead
2069 * just rely on .invalidated.
2070 */
2071 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2072 {
2075 }
2076
2077 /* Let caller know */
2078 invalidated = true;
2079 }
2080 else
2081 {
2082 active_pid = GetPGProcByNumber(active_proc)->pid;
2083 Assert(active_pid != 0);
2084 }
2085
2087
2088 /*
2089 * Calculate the idle time duration of the slot if slot is marked
2090 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2091 */
2092 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2093 {
2094 int slot_idle_usecs;
2095
2096 TimestampDifference(inactive_since, now, &slot_idle_secs,
2097 &slot_idle_usecs);
2098 }
2099
2100 if (active_proc != INVALID_PROC_NUMBER)
2101 {
2102 /*
2103 * Prepare the sleep on the slot's condition variable before
2104 * releasing the lock, to close a possible race condition if the
2105 * slot is released before the sleep below.
2106 */
2108
2109 LWLockRelease(ReplicationSlotControlLock);
2110 released_lock = true;
2111
2112 /*
2113 * Signal to terminate the process that owns the slot, if we
2114 * haven't already signalled it. (Avoidance of repeated
2115 * signalling is the only reason for there to be a loop in this
2116 * routine; otherwise we could rely on caller's restart loop.)
2117 *
2118 * There is the race condition that other process may own the slot
2119 * after its current owner process is terminated and before this
2120 * process owns it. To handle that, we signal only if the PID of
2121 * the owning process has changed from the previous time. (This
2122 * logic assumes that the same PID is not reused very quickly.)
2123 */
2124 if (last_signaled_pid != active_pid)
2125 {
2126 ReportSlotInvalidation(invalidation_cause, true, active_pid,
2127 slotname, restart_lsn,
2128 oldestLSN, snapshotConflictHorizon,
2129 slot_idle_secs);
2130
2131 if (MyBackendType == B_STARTUP)
2132 (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2133 active_pid,
2135 else
2136 (void) kill(active_pid, SIGTERM);
2137
2138 last_signaled_pid = active_pid;
2139 }
2140
2141 /* Wait until the slot is released. */
2143 WAIT_EVENT_REPLICATION_SLOT_DROP);
2144
2145 /*
2146 * Re-acquire lock and start over; we expect to invalidate the
2147 * slot next time (unless another process acquires the slot in the
2148 * meantime).
2149 *
2150 * Note: It is possible for a slot to advance its restart_lsn or
2151 * xmin values sufficiently between when we release the mutex and
2152 * when we recheck, moving from a conflicting state to a non
2153 * conflicting state. This is intentional and safe: if the slot
2154 * has caught up while we're busy here, the resources we were
2155 * concerned about (WAL segments or tuples) have not yet been
2156 * removed, and there's no reason to invalidate the slot.
2157 */
2158 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2159 continue;
2160 }
2161 else
2162 {
2163 /*
2164 * We hold the slot now and have already invalidated it; flush it
2165 * to ensure that state persists.
2166 *
2167 * Don't want to hold ReplicationSlotControlLock across file
2168 * system operations, so release it now but be sure to tell caller
2169 * to restart from scratch.
2170 */
2171 LWLockRelease(ReplicationSlotControlLock);
2172 released_lock = true;
2173
2174 /* Make sure the invalidated state persists across server restart */
2178
2179 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2180 slotname, restart_lsn,
2181 oldestLSN, snapshotConflictHorizon,
2182 slot_idle_secs);
2183
2184 /* done with this slot for now */
2185 break;
2186 }
2187 }
2188
2189 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2190
2191 *released_lock_out = released_lock;
2192 return invalidated;
2193}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, B_STARTUP, ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DetermineSlotInvalidationCause(), GetCurrentTimestamp(), GetPGProcByNumber, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, kill, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockHeldByMeInMode(), LWLockRelease(), ReplicationSlot::mutex, MyBackendType, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, now(), RECOVERY_CONFLICT_LOGICALSLOT, ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReportSlotInvalidation(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SignalRecoveryConflict(), SpinLockAcquire(), SpinLockRelease(), and TimestampDifference().

Referenced by InvalidateObsoleteReplicationSlots().

◆ IsSlotForConflictCheck()

static bool IsSlotForConflictCheck ( const char *  name)
static

Definition at line 360 of file slot.c.

361{
362 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363}

References CONFLICT_DETECTION_SLOT, and name.

Referenced by ReplicationSlotAcquire(), and ReplicationSlotValidateNameInternal().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char *  name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 629 of file slot.c.

630{
632 ProcNumber active_proc;
633 int active_pid;
634
635 Assert(name != NULL);
636
637retry:
638 Assert(MyReplicationSlot == NULL);
639
640 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
641
642 /* Check if the slot exists with the given name. */
644 if (s == NULL || !s->in_use)
645 {
646 LWLockRelease(ReplicationSlotControlLock);
647
649 (errcode(ERRCODE_UNDEFINED_OBJECT),
650 errmsg("replication slot \"%s\" does not exist",
651 name)));
652 }
653
654 /*
655 * Do not allow users to acquire the reserved slot. This scenario may
656 * occur if the launcher that owns the slot has terminated unexpectedly
657 * due to an error, and a backend process attempts to reuse the slot.
658 */
661 errcode(ERRCODE_UNDEFINED_OBJECT),
662 errmsg("cannot acquire replication slot \"%s\"", name),
663 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664
665 /*
666 * This is the slot we want; check if it's active under some other
667 * process. In single user mode, we don't need this check.
668 */
670 {
671 /*
672 * Get ready to sleep on the slot in case it is active. (We may end
673 * up not sleeping, but we don't want to do this while holding the
674 * spinlock.)
675 */
676 if (!nowait)
678
679 /*
680 * It is important to reset the inactive_since under spinlock here to
681 * avoid race conditions with slot invalidation. See comments related
682 * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 */
687 active_proc = s->active_proc;
690 }
691 else
692 {
693 s->active_proc = active_proc = MyProcNumber;
695 }
696 active_pid = GetPGProcByNumber(active_proc)->pid;
697 LWLockRelease(ReplicationSlotControlLock);
698
699 /*
700 * If we found the slot but it's already active in another process, we
701 * wait until the owning process signals us that it's been released, or
702 * error out.
703 */
704 if (active_proc != MyProcNumber)
705 {
706 if (!nowait)
707 {
708 /* Wait here until we get signaled, and then restart */
710 WAIT_EVENT_REPLICATION_SLOT_DROP);
712 goto retry;
713 }
714
716 (errcode(ERRCODE_OBJECT_IN_USE),
717 errmsg("replication slot \"%s\" is active for PID %d",
718 NameStr(s->data.name), active_pid)));
719 }
720 else if (!nowait)
721 ConditionVariableCancelSleep(); /* no sleep needed after all */
722
723 /* We made this slot active, so it's ours now. */
725
726 /*
727 * We need to check for invalidation after making the slot ours to avoid
728 * the possible race condition with the checkpointer that can otherwise
729 * invalidate the slot immediately after the check.
730 */
731 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
733 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
734 errmsg("can no longer access replication slot \"%s\"",
735 NameStr(s->data.name)),
736 errdetail("This replication slot has been invalidated due to \"%s\".",
738
739 /* Let everybody know we've modified this slot */
741
742 /*
743 * The call to pgstat_acquire_replslot() protects against stats for a
744 * different slot, from before a restart or such, being present during
745 * pgstat_report_replslot().
746 */
747 if (SlotIsLogical(s))
749
750
751 if (am_walsender)
752 {
755 ? errmsg("acquired logical replication slot \"%s\"",
756 NameStr(s->data.name))
757 : errmsg("acquired physical replication slot \"%s\"",
758 NameStr(s->data.name)));
759 }
760}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg, ERROR, GetPGProcByNumber, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_check_logical_slot_pending_wal(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char *  name,
const bool *  failover,
const bool *  two_phase 
)

Definition at line 960 of file slot.c.

962{
963 bool update_slot = false;
964
965 Assert(MyReplicationSlot == NULL);
967
968 ReplicationSlotAcquire(name, false, true);
969
972 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
973 errmsg("cannot use %s with a physical replication slot",
974 "ALTER_REPLICATION_SLOT"));
975
976 if (RecoveryInProgress())
977 {
978 /*
979 * Do not allow users to alter the slots which are currently being
980 * synced from the primary to the standby.
981 */
984 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
985 errmsg("cannot alter replication slot \"%s\"", name),
986 errdetail("This replication slot is being synchronized from the primary server."));
987
988 /*
989 * Do not allow users to enable failover on the standby as we do not
990 * support sync to the cascading standby.
991 */
992 if (failover && *failover)
994 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
995 errmsg("cannot enable failover for a replication slot"
996 " on the standby"));
997 }
998
999 if (failover)
1000 {
1001 /*
1002 * Do not allow users to enable failover for temporary slots as we do
1003 * not support syncing temporary slots to the standby.
1004 */
1006 ereport(ERROR,
1007 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008 errmsg("cannot enable failover for a temporary replication slot"));
1009
1011 {
1015
1016 update_slot = true;
1017 }
1018 }
1019
1021 {
1025
1026 update_slot = true;
1027 }
1028
1029 if (update_slot)
1030 {
1033 }
1034
1036}

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 868 of file slot.c.

869{
870 int i;
871 bool found_valid_logicalslot;
872 bool dropped_logical = false;
873
874 Assert(MyReplicationSlot == NULL);
875
876restart:
877 found_valid_logicalslot = false;
878 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
880 {
882
883 if (!s->in_use)
884 continue;
885
887
888 found_valid_logicalslot |=
890
891 if ((s->active_proc == MyProcNumber &&
892 (!synced_only || s->data.synced)))
893 {
896 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
897
898 if (SlotIsLogical(s))
899 dropped_logical = true;
900
902
904 goto restart;
905 }
906 else
908 }
909
910 LWLockRelease(ReplicationSlotControlLock);
911
912 if (dropped_logical && !found_valid_logicalslot)
914}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char *  name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  repack,
bool  failover,
bool  synced 
)

Definition at line 378 of file slot.c.

381{
382 ReplicationSlot *slot = NULL;
383 int startpoint,
384 endpoint;
385
386 Assert(MyReplicationSlot == NULL);
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
407 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
419 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
430 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431
432 /*
433 * Check for name collision (across the whole array), and identify an
434 * allocatable slot (in the array slice specific to our current use case:
435 * either general, or REPACK only). We need to hold
436 * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 * can change the in_use flags while we're looking at them.
438 */
439 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
440 startpoint = !repack ? 0 : max_replication_slots;
441 endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
443 {
445
446 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
449 errmsg("replication slot \"%s\" already exists", name)));
450
451 if (i >= startpoint && i < endpoint &&
452 !s->in_use && slot == NULL)
453 slot = s;
454 }
455 LWLockRelease(ReplicationSlotControlLock);
456
457 /* If all slots are in use, we're out of luck. */
458 if (slot == NULL)
460 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
461 errmsg("all replication slots are in use"),
462 errhint("Free one or increase \"%s\".",
463 repack ? "max_repack_replication_slots" : "max_replication_slots")));
464
465 /*
466 * Since this slot is not in use, nobody should be looking at any part of
467 * it other than the in_use field unless they're trying to allocate it.
468 * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 * be doing that. So it's safe to initialize the slot.
470 */
471 Assert(!slot->in_use);
473
474 /* first initialize persistent data */
475 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 namestrcpy(&slot->data.name, name);
477 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
478 slot->data.persistency = persistency;
479 slot->data.two_phase = two_phase;
481 slot->data.failover = failover;
482 slot->data.synced = synced;
483
484 /* and then data only present in shared memory */
485 slot->just_dirtied = false;
486 slot->dirty = false;
495 slot->inactive_since = 0;
497
498 /*
499 * Create the slot on disk. We haven't actually marked the slot allocated
500 * yet, so no special cleanup is required if this errors out.
501 */
502 CreateSlotOnDisk(slot);
503
504 /*
505 * We need to briefly prevent any other backend from iterating over the
506 * slots while we flip the in_use flag. We also need to set the active
507 * flag while holding the ControlLock as otherwise a concurrent
508 * ReplicationSlotAcquire() could acquire the slot as well.
509 */
510 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
511
512 slot->in_use = true;
513
514 /* We can now mark the slot active, and that makes it our slot. */
515 SpinLockAcquire(&slot->mutex);
518 SpinLockRelease(&slot->mutex);
519 MyReplicationSlot = slot;
520
521 LWLockRelease(ReplicationSlotControlLock);
522
523 /*
524 * Create statistics entry for the new logical slot. We don't collect any
525 * stats for physical slots, so no need to create an entry for the same.
526 * See ReplicationSlotDropPtr for why we need to do this before releasing
527 * ReplicationSlotAllocationLock.
528 */
529 if (SlotIsLogical(slot))
531
532 /*
533 * Now that the slot has been marked as in_use and active, it's safe to
534 * let somebody else try to allocate a slot.
535 */
536 LWLockRelease(ReplicationSlotAllocationLock);
537
538 /* Let everybody know we've modified this slot */
540}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire(), SpinLockRelease(), SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), repack_setup_logical_decoding(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char *  name,
bool  nowait 
)

Definition at line 920 of file slot.c.

921{
922 bool is_logical;
923
924 Assert(MyReplicationSlot == NULL);
925
926 ReplicationSlotAcquire(name, nowait, false);
927
928 /*
929 * Do not allow users to drop the slots which are currently being synced
930 * from the primary to the standby.
931 */
934 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
935 errmsg("cannot drop replication slot \"%s\"", name),
936 errdetail("This replication slot is being synchronized from the primary server."));
937
938 is_logical = SlotIsLogical(MyReplicationSlot);
939
941
942 if (is_logical)
944}

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( void  )

Definition at line 1042 of file slot.c.

1043{
1045
1046 Assert(MyReplicationSlot != NULL);
1047
1048 /* slot isn't acquired anymore */
1049 MyReplicationSlot = NULL;
1050
1052}

References Assert, MyReplicationSlot, and ReplicationSlotDropPtr().

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), repack_cleanup_logical_decoding(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropPtr()

static void ReplicationSlotDropPtr ( ReplicationSlot slot)
static

Definition at line 1059 of file slot.c.

1060{
1061 char path[MAXPGPATH];
1062 char tmppath[MAXPGPATH];
1063
1064 /*
1065 * If some other backend ran this code concurrently with us, we might try
1066 * to delete a slot with a certain name while someone else was trying to
1067 * create a slot with the same name.
1068 */
1069 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1070
1071 /* Generate pathnames. */
1072 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1073 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1074
1075 /*
1076 * Rename the slot directory on disk, so that we'll no longer recognize
1077 * this as a valid slot. Note that if this fails, we've got to mark the
1078 * slot inactive before bailing out. If we're dropping an ephemeral or a
1079 * temporary slot, we better never fail hard as the caller won't expect
1080 * the slot to survive and this might get called during error handling.
1081 */
1082 if (rename(path, tmppath) == 0)
1083 {
1084 /*
1085 * We need to fsync() the directory we just renamed and its parent to
1086 * make sure that our changes are on disk in a crash-safe fashion. If
1087 * fsync() fails, we can't be sure whether the changes are on disk or
1088 * not. For now, we handle that by panicking;
1089 * StartupReplicationSlots() will try to straighten it out after
1090 * restart.
1091 */
1093 fsync_fname(tmppath, true);
1096 }
1097 else
1098 {
1099 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1100
1101 SpinLockAcquire(&slot->mutex);
1103 SpinLockRelease(&slot->mutex);
1104
1105 /* wake up anyone waiting on this slot */
1107
1108 ereport(fail_softly ? WARNING : ERROR,
1110 errmsg("could not rename file \"%s\" to \"%s\": %m",
1111 path, tmppath)));
1112 }
1113
1114 /*
1115 * The slot is definitely gone. Lock out concurrent scans of the array
1116 * long enough to kill it. It's OK to clear the active PID here without
1117 * grabbing the mutex because nobody else can be scanning the array here,
1118 * and nobody can be attached to this slot and thus access it without
1119 * scanning the array.
1120 *
1121 * Also wake up processes waiting for it.
1122 */
1123 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1125 slot->in_use = false;
1126 LWLockRelease(ReplicationSlotControlLock);
1128
1129 /*
1130 * Slot is dead and doesn't prevent resource removal anymore, recompute
1131 * limits.
1132 */
1135
1136 /*
1137 * If removing the directory fails, the worst thing that will happen is
1138 * that the user won't be able to create a new slot with the same name
1139 * until the next server restart. We warn about it, but that's all.
1140 */
1141 if (!rmtree(tmppath, true))
1143 (errmsg("could not remove directory \"%s\"", tmppath)));
1144
1145 /*
1146 * Drop the statistics entry for the replication slot. Do this while
1147 * holding ReplicationSlotAllocationLock so that we don't drop a
1148 * statistics entry for another slot with the same name just created in
1149 * another session.
1150 */
1151 if (SlotIsLogical(slot))
1153
1154 /*
1155 * We release this at the very end, so that nobody starts trying to create
1156 * a slot while we're still cleaning up the detritus of the old one.
1157 */
1158 LWLockRelease(ReplicationSlotAllocationLock);
1159}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, ConditionVariableBroadcast(), ReplicationSlot::data, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, ERROR, fsync_fname(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotPersistentData::persistency, PG_REPLSLOT_DIR, pgstat_drop_replslot(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), rmtree(), RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), sprintf, START_CRIT_SECTION, and WARNING.

Referenced by ReplicationSlotCleanup(), and ReplicationSlotDropAcquired().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 240 of file slot.c.

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 598 of file slot.c.

599{
600 ReplicationSlot *slot;
601 bool found;
602
604
605 /*
606 * Ensure that the slot cannot be dropped while we copy the name. Don't
607 * need the spinlock as the name of an existing slot cannot change.
608 */
609 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
610 found = slot->in_use;
611 if (slot->in_use)
613 LWLockRelease(ReplicationSlotControlLock);
614
615 return found;
616}

References ReplicationSlot::data, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 769 of file slot.c.

770{
772 char *slotname = NULL; /* keep compiler quiet */
773 bool is_logical;
774 TimestampTz now = 0;
775
776 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777
778 is_logical = SlotIsLogical(slot);
779
780 if (am_walsender)
781 slotname = pstrdup(NameStr(slot->data.name));
782
783 if (slot->data.persistency == RS_EPHEMERAL)
784 {
785 /*
786 * Delete the slot. There is no !PANIC case where this is allowed to
787 * fail, all that may happen is an incomplete cleanup of the on-disk
788 * data.
789 */
791
792 /*
793 * Request to disable logical decoding, even though this slot may not
794 * have been the last logical slot. The checkpointer will verify if
795 * logical decoding should actually be disabled.
796 */
797 if (is_logical)
799 }
800
801 /*
802 * If slot needed to temporarily restrain both data and catalog xmin to
803 * create the catalog snapshot, remove that temporary constraint.
804 * Snapshots can only be exported while the initial snapshot is still
805 * acquired.
806 */
807 if (!TransactionIdIsValid(slot->data.xmin) &&
809 {
810 SpinLockAcquire(&slot->mutex);
812 SpinLockRelease(&slot->mutex);
814 }
815
816 /*
817 * Set the time since the slot has become inactive. We get the current
818 * time beforehand to avoid system call while holding the spinlock.
819 */
821
822 if (slot->data.persistency == RS_PERSISTENT)
823 {
824 /*
825 * Mark persistent slot inactive. We're not freeing it, just
826 * disconnecting, but wake up others that may be waiting for it.
827 */
828 SpinLockAcquire(&slot->mutex);
831 SpinLockRelease(&slot->mutex);
833 }
834 else
836
837 MyReplicationSlot = NULL;
838
839 /* might not have been set when we've been a plain slot */
840 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
841 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
843 LWLockRelease(ProcArrayLock);
844
845 if (am_walsender)
846 {
848 is_logical
849 ? errmsg("released logical replication slot \"%s\"",
850 slotname)
851 : errmsg("released physical replication slot \"%s\"",
852 slotname));
853
854 pfree(slotname);
855 }
856}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg, GetCurrentTimestamp(), INVALID_PROC_NUMBER, InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RequestDisableLogicalDecoding(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_check_logical_slot_pending_wal(), binary_upgrade_create_conflict_detection_slot(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1711 of file slot.c.

1712{
1714 XLogSegNo segno;
1715 XLogRecPtr restart_lsn;
1716
1717 Assert(slot != NULL);
1720
1721 /*
1722 * The replication slot mechanism is used to prevent the removal of
1723 * required WAL.
1724 *
1725 * Acquire an exclusive lock to prevent the checkpoint process from
1726 * concurrently computing the minimum slot LSN (see
1727 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1728 * replication cannot be removed during a checkpoint.
1729 *
1730 * The mechanism is reliable because if WAL reservation occurs first, the
1731 * checkpoint must wait for the restart_lsn update before determining the
1732 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1733 * first, subsequent WAL reservations will select positions at or beyond
1734 * the redo pointer of that checkpoint.
1735 */
1736 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1737
1738 /*
1739 * For logical slots log a standby snapshot and start logical decoding at
1740 * exactly that position. That allows the slot to start up more quickly.
1741 * But on a standby we cannot do WAL writes, so just use the replay
1742 * pointer; effectively, an attempt to create a logical slot on standby
1743 * will cause it to wait for an xl_running_xact record to be logged
1744 * independently on the primary, so that a snapshot can be built using the
1745 * record.
1746 *
1747 * None of this is needed (or indeed helpful) for physical slots as
1748 * they'll start replay at the last logged checkpoint anyway. Instead,
1749 * return the location of the last redo LSN, where a base backup has to
1750 * start replay at.
1751 */
1752 if (SlotIsPhysical(slot))
1753 restart_lsn = GetRedoRecPtr();
1754 else if (RecoveryInProgress())
1755 restart_lsn = GetXLogReplayRecPtr(NULL);
1756 else
1757 restart_lsn = GetXLogInsertRecPtr();
1758
1759 SpinLockAcquire(&slot->mutex);
1760 slot->data.restart_lsn = restart_lsn;
1761 SpinLockRelease(&slot->mutex);
1762
1763 /* prevent WAL removal as fast as possible */
1765
1766 /* Checkpoint shouldn't remove the required WAL. */
1768 if (XLogGetLastRemovedSegno() >= segno)
1769 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1770 NameStr(slot->data.name));
1771
1772 LWLockRelease(ReplicationSlotAllocationLock);
1773
1774 if (!RecoveryInProgress() && SlotIsLogical(slot))
1775 {
1776 XLogRecPtr flushptr;
1777
1778 /* make sure we have enough information to start */
1779 flushptr = LogStandbySnapshot(InvalidOid);
1780
1781 /* and make sure it's fsynced to disk */
1782 XLogFlush(flushptr);
1783 }
1784}

References Assert, ReplicationSlot::data, elog, ERROR, GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), InvalidOid, ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1378 of file slot.c.

1379{
1381 int i;
1382
1384 return InvalidXLogRecPtr;
1385
1386 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1387
1389 {
1390 ReplicationSlot *s;
1391 XLogRecPtr restart_lsn;
1392 XLogRecPtr last_saved_restart_lsn;
1393 bool invalidated;
1394 ReplicationSlotPersistency persistency;
1395
1397
1398 /* cannot change while ReplicationSlotCtlLock is held */
1399 if (!s->in_use)
1400 continue;
1401
1402 /* we're only interested in logical slots */
1403 if (!SlotIsLogical(s))
1404 continue;
1405
1406 /* read once, it's ok if it increases while we're checking */
1408 persistency = s->data.persistency;
1409 restart_lsn = s->data.restart_lsn;
1410 invalidated = s->data.invalidated != RS_INVAL_NONE;
1411 last_saved_restart_lsn = s->last_saved_restart_lsn;
1413
1414 /* invalidated slots need not apply */
1415 if (invalidated)
1416 continue;
1417
1418 /*
1419 * For persistent slot use last_saved_restart_lsn to compute the
1420 * oldest LSN for removal of WAL segments. The segments between
1421 * last_saved_restart_lsn and restart_lsn might be needed by a
1422 * persistent slot in the case of database crash. Non-persistent
1423 * slots can't survive the database crash, so we don't care about
1424 * last_saved_restart_lsn for them.
1425 */
1426 if (persistency == RS_PERSISTENT)
1427 {
1428 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1429 restart_lsn > last_saved_restart_lsn)
1430 {
1431 restart_lsn = last_saved_restart_lsn;
1432 }
1433 }
1434
1435 if (!XLogRecPtrIsValid(restart_lsn))
1436 continue;
1437
1438 if (!XLogRecPtrIsValid(result) ||
1439 restart_lsn < result)
1440 result = restart_lsn;
1441 }
1442
1443 LWLockRelease(ReplicationSlotControlLock);
1444
1445 return result;
1446}

References ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, result, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1308 of file slot.c.

1309{
1310 int i;
1311 XLogRecPtr min_required = InvalidXLogRecPtr;
1312
1313 Assert(ReplicationSlotCtl != NULL);
1314
1315 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1317 {
1319 XLogRecPtr restart_lsn;
1320 XLogRecPtr last_saved_restart_lsn;
1321 bool invalidated;
1322 ReplicationSlotPersistency persistency;
1323
1324 if (!s->in_use)
1325 continue;
1326
1328 persistency = s->data.persistency;
1329 restart_lsn = s->data.restart_lsn;
1330 invalidated = s->data.invalidated != RS_INVAL_NONE;
1331 last_saved_restart_lsn = s->last_saved_restart_lsn;
1333
1334 /* invalidated slots need not apply */
1335 if (invalidated)
1336 continue;
1337
1338 /*
1339 * For persistent slot use last_saved_restart_lsn to compute the
1340 * oldest LSN for removal of WAL segments. The segments between
1341 * last_saved_restart_lsn and restart_lsn might be needed by a
1342 * persistent slot in the case of database crash. Non-persistent
1343 * slots can't survive the database crash, so we don't care about
1344 * last_saved_restart_lsn for them.
1345 */
1346 if (persistency == RS_PERSISTENT)
1347 {
1348 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1349 restart_lsn > last_saved_restart_lsn)
1350 {
1351 restart_lsn = last_saved_restart_lsn;
1352 }
1353 }
1354
1355 if (XLogRecPtrIsValid(restart_lsn) &&
1356 (!XLogRecPtrIsValid(min_required) ||
1357 restart_lsn < min_required))
1358 min_required = restart_lsn;
1359 }
1360 LWLockRelease(ReplicationSlotControlLock);
1361
1363}

References Assert, ReplicationSlot::data, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire(), SpinLockRelease(), XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1226 of file slot.c.

1227{
1228 int i;
1230 TransactionId agg_catalog_xmin = InvalidTransactionId;
1231
1232 Assert(ReplicationSlotCtl != NULL);
1233 Assert(!already_locked ||
1234 (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1235 LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1236
1237 /*
1238 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1239 * values, so no backend updates the initial xmin for newly created slot
1240 * concurrently. A shared lock is used here to minimize lock contention,
1241 * especially when many slots exist and advancements occur frequently.
1242 * This is safe since an exclusive lock is taken during initial slot xmin
1243 * update in slot creation.
1244 *
1245 * One might think that we can hold the ProcArrayLock exclusively and
1246 * update the slot xmin values, but it could increase lock contention on
1247 * the ProcArrayLock, which is not great since this function can be called
1248 * at non-negligible frequency.
1249 *
1250 * Concurrent invocation of this function may cause the computed slot xmin
1251 * to regress. However, this is harmless because tuples prior to the most
1252 * recent xmin are no longer useful once advancement occurs (see
1253 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1254 * before updating the effective_xmin). Thus, such regression merely
1255 * prevents VACUUM from prematurely removing tuples without causing the
1256 * early deletion of required data.
1257 */
1258 if (!already_locked)
1259 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1260
1262 {
1264 TransactionId effective_xmin;
1265 TransactionId effective_catalog_xmin;
1266 bool invalidated;
1267
1268 if (!s->in_use)
1269 continue;
1270
1272 effective_xmin = s->effective_xmin;
1273 effective_catalog_xmin = s->effective_catalog_xmin;
1274 invalidated = s->data.invalidated != RS_INVAL_NONE;
1276
1277 /* invalidated slots need not apply */
1278 if (invalidated)
1279 continue;
1280
1281 /* check the data xmin */
1282 if (TransactionIdIsValid(effective_xmin) &&
1283 (!TransactionIdIsValid(agg_xmin) ||
1284 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1285 agg_xmin = effective_xmin;
1286
1287 /* check the catalog xmin */
1288 if (TransactionIdIsValid(effective_catalog_xmin) &&
1289 (!TransactionIdIsValid(agg_catalog_xmin) ||
1290 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1291 agg_catalog_xmin = effective_catalog_xmin;
1292 }
1293
1294 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1295
1296 if (!already_locked)
1297 LWLockRelease(ReplicationSlotControlLock);
1298}

References Assert, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int *  nslots,
int *  nactive 
)

Definition at line 1457 of file slot.c.

1458{
1459 int i;
1460
1461 *nslots = *nactive = 0;
1462
1464 return false;
1465
1466 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1468 {
1469 ReplicationSlot *s;
1470
1472
1473 /* cannot change while ReplicationSlotCtlLock is held */
1474 if (!s->in_use)
1475 continue;
1476
1477 /* only logical slots are database specific, skip */
1478 if (!SlotIsLogical(s))
1479 continue;
1480
1481 /* not our database, skip */
1482 if (s->data.database != dboid)
1483 continue;
1484
1485 /* NB: intentionally counting invalidated slots */
1486
1487 /* count slots with spinlock held */
1489 (*nslots)++;
1491 (*nactive)++;
1493 }
1494 LWLockRelease(ReplicationSlotControlLock);
1495
1496 if (*nslots > 0)
1497 return true;
1498 return false;
1499}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1518 of file slot.c.

1519{
1520 int i;
1521 bool found_valid_logicalslot;
1522 bool dropped = false;
1523
1525 return;
1526
1527restart:
1528 found_valid_logicalslot = false;
1529 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1531 {
1532 ReplicationSlot *s;
1533 char *slotname;
1534 ProcNumber active_proc;
1535
1537
1538 /* cannot change while ReplicationSlotCtlLock is held */
1539 if (!s->in_use)
1540 continue;
1541
1542 /* only logical slots are database specific, skip */
1543 if (!SlotIsLogical(s))
1544 continue;
1545
1546 /*
1547 * Check logical slots on other databases too so we can disable
1548 * logical decoding only if no slots in the cluster.
1549 */
1551 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1553
1554 /* not our database, skip */
1555 if (s->data.database != dboid)
1556 continue;
1557
1558 /* NB: intentionally including invalidated slots to drop */
1559
1560 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1562 /* can't change while ReplicationSlotControlLock is held */
1563 slotname = NameStr(s->data.name);
1564 active_proc = s->active_proc;
1565 if (active_proc == INVALID_PROC_NUMBER)
1566 {
1569 }
1571
1572 /*
1573 * Even though we hold an exclusive lock on the database object a
1574 * logical slot for that DB can still be active, e.g. if it's
1575 * concurrently being dropped by a backend connected to another DB.
1576 *
1577 * That's fairly unlikely in practice, so we'll just bail out.
1578 *
1579 * The slot sync worker holds a shared lock on the database before
1580 * operating on synced logical slots to avoid conflict with the drop
1581 * happening here. The persistent synced slots are thus safe but there
1582 * is a possibility that the slot sync worker has created a temporary
1583 * slot (which stays active even on release) and we are trying to drop
1584 * that here. In practice, the chances of hitting this scenario are
1585 * less as during slot synchronization, the temporary slot is
1586 * immediately converted to persistent and thus is safe due to the
1587 * shared lock taken on the database. So, we'll just bail out in such
1588 * a case.
1589 *
1590 * XXX: We can consider shutting down the slot sync worker before
1591 * trying to drop synced temporary slots here.
1592 */
1593 if (active_proc != INVALID_PROC_NUMBER)
1594 ereport(ERROR,
1595 (errcode(ERRCODE_OBJECT_IN_USE),
1596 errmsg("replication slot \"%s\" is active for PID %d",
1597 slotname, GetPGProcByNumber(active_proc)->pid)));
1598
1599 /*
1600 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1601 * holding ReplicationSlotControlLock over filesystem operations,
1602 * release ReplicationSlotControlLock and use
1603 * ReplicationSlotDropAcquired.
1604 *
1605 * As that means the set of slots could change, restart scan from the
1606 * beginning each time we release the lock.
1607 */
1608 LWLockRelease(ReplicationSlotControlLock);
1610 dropped = true;
1611 goto restart;
1612 }
1613 LWLockRelease(ReplicationSlotControlLock);
1614
1615 if (dropped && !found_valid_logicalslot)
1617}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg, ERROR, GetPGProcByNumber, i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotShmemExit()

static void ReplicationSlotShmemExit ( int  code,
Datum  arg 
)
static

Definition at line 249 of file slot.c.

250{
251 /* Make sure active replication slots are released */
252 if (MyReplicationSlot != NULL)
254
255 /* Also cleanup all the temporary slots. */
257}

References MyReplicationSlot, ReplicationSlotCleanup(), and ReplicationSlotRelease().

Referenced by ReplicationSlotInitialize().

◆ ReplicationSlotsShmemInit()

static void ReplicationSlotsShmemInit ( void *  arg)
static

◆ ReplicationSlotsShmemRequest()

static void ReplicationSlotsShmemRequest ( void *  arg)
static

Definition at line 200 of file slot.c.

201{
202 Size size;
203
205 return;
206
207 size = offsetof(ReplicationSlotCtlData, replication_slots);
208 size = add_size(size,
210 sizeof(ReplicationSlot)));
211 ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 .size = size,
213 .ptr = (void **) &ReplicationSlotCtl,
214 );
215}

References add_size(), max_repack_replication_slots, max_replication_slots, mul_size(), name, ReplicationSlotCtl, and ShmemRequestStruct.

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char *  name,
bool  allow_reserved_name,
int  elevel 
)

Definition at line 265 of file slot.c.

267{
268 int err_code;
269 char *err_msg = NULL;
270 char *err_hint = NULL;
271
272 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
273 &err_code, &err_msg, &err_hint))
274 {
275 /*
276 * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 * and errhint(), since the messages from
278 * ReplicationSlotValidateNameInternal() are already translated. This
279 * avoids double translation.
280 */
281 ereport(elevel,
282 errcode(err_code),
283 errmsg_internal("%s", err_msg),
284 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285
286 pfree(err_msg);
287 if (err_hint != NULL)
288 pfree(err_hint);
289 return false;
290 }
291
292 return true;
293}

References ereport, errcode(), errhint_internal(), errmsg_internal(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char *  name,
bool  allow_reserved_name,
int *  err_code,
char **  err_msg,
char **  err_hint 
)

Definition at line 310 of file slot.c.

312{
313 const char *cp;
314
315 if (strlen(name) == 0)
316 {
317 *err_code = ERRCODE_INVALID_NAME;
318 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 *err_hint = NULL;
320 return false;
321 }
322
323 if (strlen(name) >= NAMEDATALEN)
324 {
325 *err_code = ERRCODE_NAME_TOO_LONG;
326 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 *err_hint = NULL;
328 return false;
329 }
330
331 for (cp = name; *cp; cp++)
332 {
333 if (!((*cp >= 'a' && *cp <= 'z')
334 || (*cp >= '0' && *cp <= '9')
335 || (*cp == '_')))
336 {
337 *err_code = ERRCODE_INVALID_NAME;
338 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 return false;
341 }
342 }
343
344 if (!allow_reserved_name && IsSlotForConflictCheck(name))
345 {
346 *err_code = ERRCODE_RESERVED_NAME;
347 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 return false;
351 }
352
353 return true;
354}

References _, CONFLICT_DETECTION_SLOT, IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ ReportSlotInvalidation()

static void ReportSlotInvalidation ( ReplicationSlotInvalidationCause  cause,
bool  terminating,
int  pid,
NameData  slotname,
XLogRecPtr  restart_lsn,
XLogRecPtr  oldestLSN,
TransactionId  snapshotConflictHorizon,
long  slot_idle_seconds 
)
static

Definition at line 1790 of file slot.c.

1798{
1799 StringInfoData err_detail;
1800 StringInfoData err_hint;
1801
1802 initStringInfo(&err_detail);
1803 initStringInfo(&err_hint);
1804
1805 switch (cause)
1806 {
1808 {
1809 uint64 ex = oldestLSN - restart_lsn;
1810
1811 appendStringInfo(&err_detail,
1812 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1813 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1814 ex),
1815 LSN_FORMAT_ARGS(restart_lsn),
1816 ex);
1817 /* translator: %s is a GUC variable name */
1818 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1819 "max_slot_wal_keep_size");
1820 break;
1821 }
1822 case RS_INVAL_HORIZON:
1823 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1824 snapshotConflictHorizon);
1825 break;
1826
1827 case RS_INVAL_WAL_LEVEL:
1828 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1829 break;
1830
1832 {
1833 /* translator: %s is a GUC variable name */
1834 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1835 slot_idle_seconds, "idle_replication_slot_timeout",
1837 /* translator: %s is a GUC variable name */
1838 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1839 "idle_replication_slot_timeout");
1840 break;
1841 }
1842 case RS_INVAL_NONE:
1844 }
1845
1846 ereport(LOG,
1847 terminating ?
1848 errmsg("terminating process %d to release replication slot \"%s\"",
1849 pid, NameStr(slotname)) :
1850 errmsg("invalidating obsolete replication slot \"%s\"",
1851 NameStr(slotname)),
1852 errdetail_internal("%s", err_detail.data),
1853 err_hint.len ? errhint("%s", err_hint.data) : 0);
1854
1855 pfree(err_detail.data);
1856 pfree(err_hint.data);
1857}

References _, appendStringInfo(), appendStringInfoString(), StringInfoData::data, ereport, errdetail_internal(), errhint(), errmsg, idle_replication_slot_timeout_secs, initStringInfo(), StringInfoData::len, LOG, LSN_FORMAT_ARGS, NameStr, ngettext, pfree(), pg_unreachable, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, and RS_INVAL_WAL_REMOVED.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ RestoreSlotFromDisk()

static void RestoreSlotFromDisk ( const char *  name)
static

Definition at line 2687 of file slot.c.

2688{
2690 int i;
2691 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2692 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2693 int fd;
2694 bool restored = false;
2695 int readBytes;
2696 pg_crc32c checksum;
2697 TimestampTz now = 0;
2698
2699 /* no need to lock here, no concurrent access allowed yet */
2700
2701 /* delete temp file if it exists */
2702 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2703 sprintf(path, "%s/state.tmp", slotdir);
2704 if (unlink(path) < 0 && errno != ENOENT)
2705 ereport(PANIC,
2707 errmsg("could not remove file \"%s\": %m", path)));
2708
2709 sprintf(path, "%s/state", slotdir);
2710
2711 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2712
2713 /* on some operating systems fsyncing a file requires O_RDWR */
2714 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2715
2716 /*
2717 * We do not need to handle this as we are rename()ing the directory into
2718 * place only after we fsync()ed the state file.
2719 */
2720 if (fd < 0)
2721 ereport(PANIC,
2723 errmsg("could not open file \"%s\": %m", path)));
2724
2725 /*
2726 * Sync state file before we're reading from it. We might have crashed
2727 * while it wasn't synced yet and we shouldn't continue on that basis.
2728 */
2729 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2730 if (pg_fsync(fd) != 0)
2731 ereport(PANIC,
2733 errmsg("could not fsync file \"%s\": %m",
2734 path)));
2736
2737 /* Also sync the parent directory */
2739 fsync_fname(slotdir, true);
2741
2742 /* read part of statefile that's guaranteed to be version independent */
2743 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2744 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2746 if (readBytes != ReplicationSlotOnDiskConstantSize)
2747 {
2748 if (readBytes < 0)
2749 ereport(PANIC,
2751 errmsg("could not read file \"%s\": %m", path)));
2752 else
2753 ereport(PANIC,
2755 errmsg("could not read file \"%s\": read %d of %zu",
2756 path, readBytes,
2758 }
2759
2760 /* verify magic */
2761 if (cp.magic != SLOT_MAGIC)
2762 ereport(PANIC,
2764 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2765 path, cp.magic, SLOT_MAGIC)));
2766
2767 /* verify version */
2768 if (cp.version != SLOT_VERSION)
2769 ereport(PANIC,
2771 errmsg("replication slot file \"%s\" has unsupported version %u",
2772 path, cp.version)));
2773
2774 /* boundary check on length */
2776 ereport(PANIC,
2778 errmsg("replication slot file \"%s\" has corrupted length %u",
2779 path, cp.length)));
2780
2781 /* Now that we know the size, read the entire file */
2782 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2783 readBytes = read(fd,
2784 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2785 cp.length);
2787 if (readBytes != cp.length)
2788 {
2789 if (readBytes < 0)
2790 ereport(PANIC,
2792 errmsg("could not read file \"%s\": %m", path)));
2793 else
2794 ereport(PANIC,
2796 errmsg("could not read file \"%s\": read %d of %zu",
2797 path, readBytes, (Size) cp.length)));
2798 }
2799
2800 if (CloseTransientFile(fd) != 0)
2801 ereport(PANIC,
2803 errmsg("could not close file \"%s\": %m", path)));
2804
2805 /* now verify the CRC */
2806 INIT_CRC32C(checksum);
2807 COMP_CRC32C(checksum,
2810 FIN_CRC32C(checksum);
2811
2812 if (!EQ_CRC32C(checksum, cp.checksum))
2813 ereport(PANIC,
2814 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2815 path, checksum, cp.checksum)));
2816
2817 /*
2818 * If we crashed with an ephemeral slot active, don't restore but delete
2819 * it.
2820 */
2822 {
2823 if (!rmtree(slotdir, true))
2824 {
2826 (errmsg("could not remove directory \"%s\"",
2827 slotdir)));
2828 }
2830 return;
2831 }
2832
2833 /*
2834 * Verify that requirements for the specific slot type are met. That's
2835 * important because if these aren't met we're not guaranteed to retain
2836 * all the necessary resources for the slot.
2837 *
2838 * NB: We have to do so *after* the above checks for ephemeral slots,
2839 * because otherwise a slot that shouldn't exist anymore could prevent
2840 * restarts.
2841 *
2842 * NB: Changing the requirements here also requires adapting
2843 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2844 */
2845 if (cp.slotdata.database != InvalidOid)
2846 {
2848 ereport(FATAL,
2849 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2850 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2851 NameStr(cp.slotdata.name)),
2852 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2853
2854 /*
2855 * In standby mode, the hot standby must be enabled. This check is
2856 * necessary to ensure logical slots are invalidated when they become
2857 * incompatible due to insufficient wal_level. Otherwise, if the
2858 * primary reduces effective_wal_level < logical while hot standby is
2859 * disabled, primary disable logical decoding while hot standby is
2860 * disabled, logical slots would remain valid even after promotion.
2861 */
2863 ereport(FATAL,
2864 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2865 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2866 NameStr(cp.slotdata.name)),
2867 errhint("Change \"hot_standby\" to be \"on\".")));
2868 }
2869 else if (wal_level < WAL_LEVEL_REPLICA)
2870 ereport(FATAL,
2871 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2872 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2873 NameStr(cp.slotdata.name)),
2874 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2875
2876 /*
2877 * Nothing can be active yet, don't lock anything. Note we iterate up to
2878 * max_replication_slots instead of adding max_repack_replication_slots as
2879 * in all other places, because we must enforce the GUC value in case
2880 * there were more slots before the shutdown than what it is set up to
2881 * now.
2882 */
2883 for (i = 0; i < max_replication_slots; i++)
2884 {
2885 ReplicationSlot *slot;
2886
2888
2889 if (slot->in_use)
2890 continue;
2891
2892 /* restore the entire set of persistent data */
2893 memcpy(&slot->data, &cp.slotdata,
2895
2896 /* initialize in memory state */
2897 slot->effective_xmin = cp.slotdata.xmin;
2901
2906
2907 slot->in_use = true;
2909
2910 /*
2911 * Set the time since the slot has become inactive after loading the
2912 * slot from the disk into memory. Whoever acquires the slot i.e.
2913 * makes the slot active will reset it. Use the same inactive_since
2914 * time for all the slots.
2915 */
2916 if (now == 0)
2918
2920
2921 restored = true;
2922 break;
2923 }
2924
2925 if (!restored)
2926 ereport(FATAL,
2927 (errmsg("too many replication slots active before shutdown"),
2928 errhint("Increase \"max_replication_slots\" and try again.")));
2929}

References ReplicationSlot::active_proc, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, EnableHotStandby, END_CRIT_SECTION, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errhint(), errmsg, FATAL, fd(), FIN_CRC32C, fsync_fname(), GetCurrentTimestamp(), i, ReplicationSlot::in_use, INIT_CRC32C, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, ReplicationSlotOnDisk::length, ReplicationSlotOnDisk::magic, max_replication_slots, MAXPGPATH, memcpy(), name, ReplicationSlotPersistentData::name, NameStr, now(), OpenTransientFile(), PANIC, ReplicationSlotPersistentData::persistency, PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotSetInactiveSince(), ReplicationSlotPersistentData::restart_lsn, rmtree(), RS_PERSISTENT, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, sprintf, StandbyMode, START_CRIT_SECTION, ReplicationSlotOnDisk::version, wal_level, WAL_LEVEL_REPLICA, WARNING, and ReplicationSlotPersistentData::xmin.

Referenced by StartupReplicationSlots().

◆ SaveSlotToPath()

static void SaveSlotToPath ( ReplicationSlot slot,
const char *  dir,
int  elevel 
)
static

Definition at line 2524 of file slot.c.

2525{
2526 char tmppath[MAXPGPATH];
2527 char path[MAXPGPATH];
2528 int fd;
2530 bool was_dirty;
2531
2532 /* first check whether there's something to write out */
2533 SpinLockAcquire(&slot->mutex);
2534 was_dirty = slot->dirty;
2535 slot->just_dirtied = false;
2536 SpinLockRelease(&slot->mutex);
2537
2538 /* and don't do anything if there's nothing to write */
2539 if (!was_dirty)
2540 return;
2541
2543
2544 /* silence valgrind :( */
2545 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2546
2547 sprintf(tmppath, "%s/state.tmp", dir);
2548 sprintf(path, "%s/state", dir);
2549
2550 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2551 if (fd < 0)
2552 {
2553 /*
2554 * If not an ERROR, then release the lock before returning. In case
2555 * of an ERROR, the error recovery path automatically releases the
2556 * lock, but no harm in explicitly releasing even in that case. Note
2557 * that LWLockRelease() could affect errno.
2558 */
2559 int save_errno = errno;
2560
2562 errno = save_errno;
2563 ereport(elevel,
2565 errmsg("could not create file \"%s\": %m",
2566 tmppath)));
2567 return;
2568 }
2569
2570 cp.magic = SLOT_MAGIC;
2572 cp.version = SLOT_VERSION;
2574
2575 SpinLockAcquire(&slot->mutex);
2576
2577 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2578
2579 SpinLockRelease(&slot->mutex);
2580
2584 FIN_CRC32C(cp.checksum);
2585
2586 errno = 0;
2587 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2588 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2589 {
2590 int save_errno = errno;
2591
2594 unlink(tmppath);
2596
2597 /* if write didn't set errno, assume problem is no disk space */
2598 errno = save_errno ? save_errno : ENOSPC;
2599 ereport(elevel,
2601 errmsg("could not write to file \"%s\": %m",
2602 tmppath)));
2603 return;
2604 }
2606
2607 /* fsync the temporary file */
2608 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2609 if (pg_fsync(fd) != 0)
2610 {
2611 int save_errno = errno;
2612
2615 unlink(tmppath);
2617
2618 errno = save_errno;
2619 ereport(elevel,
2621 errmsg("could not fsync file \"%s\": %m",
2622 tmppath)));
2623 return;
2624 }
2626
2627 if (CloseTransientFile(fd) != 0)
2628 {
2629 int save_errno = errno;
2630
2631 unlink(tmppath);
2633
2634 errno = save_errno;
2635 ereport(elevel,
2637 errmsg("could not close file \"%s\": %m",
2638 tmppath)));
2639 return;
2640 }
2641
2642 /* rename to permanent file, fsync file and directory */
2643 if (rename(tmppath, path) != 0)
2644 {
2645 int save_errno = errno;
2646
2647 unlink(tmppath);
2649
2650 errno = save_errno;
2651 ereport(elevel,
2653 errmsg("could not rename file \"%s\" to \"%s\": %m",
2654 tmppath, path)));
2655 return;
2656 }
2657
2658 /*
2659 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2660 */
2662
2663 fsync_fname(path, false);
2664 fsync_fname(dir, true);
2666
2668
2669 /*
2670 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2671 * already and remember the confirmed_flush LSN value.
2672 */
2673 SpinLockAcquire(&slot->mutex);
2674 if (!slot->just_dirtied)
2675 slot->dirty = false;
2678 SpinLockRelease(&slot->mutex);
2679
2681}

References ReplicationSlotOnDisk::checksum, CloseTransientFile(), COMP_CRC32C, ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, ReplicationSlot::io_in_progress_lock, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, ReplicationSlotOnDisk::length, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlotOnDisk::magic, MAXPGPATH, memcpy(), ReplicationSlot::mutex, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotPersistentData::restart_lsn, SLOT_MAGIC, SLOT_VERSION, ReplicationSlotOnDisk::slotdata, SpinLockAcquire(), SpinLockRelease(), sprintf, START_CRIT_SECTION, ReplicationSlotOnDisk::version, and write.

Referenced by CheckPointReplicationSlots(), CreateSlotOnDisk(), and ReplicationSlotSave().

◆ SearchNamedReplicationSlot()

ReplicationSlot * SearchNamedReplicationSlot ( const char *  name,
bool  need_lock 
)

Definition at line 548 of file slot.c.

549{
550 int i;
551 ReplicationSlot *slot = NULL;
552
553 if (need_lock)
554 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
555
557 {
559
560 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 {
562 slot = s;
563 break;
564 }
565 }
566
567 if (need_lock)
568 LWLockRelease(ReplicationSlotControlLock);
569
570 return slot;
571}

References ReplicationSlot::data, i, ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, name, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by acquire_conflict_slot_if_exists(), get_replslot_index(), pg_ls_replslotdir(), pgstat_reset_replslot(), ReadReplicationSlot(), ReplicationSlotAcquire(), StandbySlotsHaveCaughtup(), and synchronize_one_slot().

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char *  slot_name)

Definition at line 3080 of file slot.c.

3081{
3082 const char *standby_slot_name;
3083
3084 /* Return false if there is no value in synchronized_standby_slots */
3086 return false;
3087
3088 /*
3089 * XXX: We are not expecting this list to be long so a linear search
3090 * shouldn't hurt but if that turns out not to be true then we can cache
3091 * this information for each WalSender as well.
3092 */
3093 standby_slot_name = synchronized_standby_slots_config->slot_names;
3094 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3095 {
3096 if (strcmp(standby_slot_name, slot_name) == 0)
3097 return true;
3098
3099 standby_slot_name += strlen(standby_slot_name) + 1;
3100 }
3101
3102 return false;
3103}

References i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 3113 of file slot.c.

3114{
3115 const char *name;
3116 int caught_up_slot_num = 0;
3117 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3118
3119 /*
3120 * Don't need to wait for the standbys to catch up if there is no value in
3121 * synchronized_standby_slots.
3122 */
3124 return true;
3125
3126 /*
3127 * Don't need to wait for the standbys to catch up if we are on a standby
3128 * server, since we do not support syncing slots to cascading standbys.
3129 */
3130 if (RecoveryInProgress())
3131 return true;
3132
3133 /*
3134 * Don't need to wait for the standbys to catch up if they are already
3135 * beyond the specified WAL location.
3136 */
3138 ss_oldest_flush_lsn >= wait_for_lsn)
3139 return true;
3140
3141 /*
3142 * To prevent concurrent slot dropping and creation while filtering the
3143 * slots, take the ReplicationSlotControlLock outside of the loop.
3144 */
3145 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3146
3148 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3149 {
3150 XLogRecPtr restart_lsn;
3151 bool invalidated;
3152 bool inactive;
3153 ReplicationSlot *slot;
3154
3155 slot = SearchNamedReplicationSlot(name, false);
3156
3157 /*
3158 * If a slot name provided in synchronized_standby_slots does not
3159 * exist, report a message and exit the loop.
3160 */
3161 if (!slot)
3162 {
3163 ereport(elevel,
3164 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3165 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3166 name, "synchronized_standby_slots"),
3167 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3168 name),
3169 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3170 name, "synchronized_standby_slots"));
3171 break;
3172 }
3173
3174 /* Same as above: if a slot is not physical, exit the loop. */
3175 if (SlotIsLogical(slot))
3176 {
3177 ereport(elevel,
3178 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3179 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3180 name, "synchronized_standby_slots"),
3181 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3182 name),
3183 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3184 name, "synchronized_standby_slots"));
3185 break;
3186 }
3187
3188 SpinLockAcquire(&slot->mutex);
3189 restart_lsn = slot->data.restart_lsn;
3190 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3191 inactive = slot->active_proc == INVALID_PROC_NUMBER;
3192 SpinLockRelease(&slot->mutex);
3193
3194 if (invalidated)
3195 {
3196 /* Specified physical slot has been invalidated */
3197 ereport(elevel,
3198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3199 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3200 name, "synchronized_standby_slots"),
3201 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3202 name),
3203 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3204 name, "synchronized_standby_slots"));
3205 break;
3206 }
3207
3208 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3209 {
3210 /* Log a message if no active_pid for this physical slot */
3211 if (inactive)
3212 ereport(elevel,
3213 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3214 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3215 name, "synchronized_standby_slots"),
3216 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3217 name),
3218 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3219 name, "synchronized_standby_slots"));
3220
3221 /* Continue if the current slot hasn't caught up. */
3222 break;
3223 }
3224
3225 Assert(restart_lsn >= wait_for_lsn);
3226
3227 if (!XLogRecPtrIsValid(min_restart_lsn) ||
3228 min_restart_lsn > restart_lsn)
3229 min_restart_lsn = restart_lsn;
3230
3231 caught_up_slot_num++;
3232
3233 name += strlen(name) + 1;
3234 }
3235
3236 LWLockRelease(ReplicationSlotControlLock);
3237
3238 /*
3239 * Return false if not all the standbys have caught up to the specified
3240 * WAL location.
3241 */
3242 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3243 return false;
3244
3245 /* The ss_oldest_flush_lsn must not retreat. */
3247 min_restart_lsn >= ss_oldest_flush_lsn);
3248
3249 ss_oldest_flush_lsn = min_restart_lsn;
3250
3251 return true;
3252}

References ReplicationSlot::active_proc, Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg, i, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2402 of file slot.c.

2403{
2404 DIR *replication_dir;
2405 struct dirent *replication_de;
2406
2407 elog(DEBUG1, "starting up replication slots");
2408
2409 /* restore all slots by iterating over all on-disk entries */
2410 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2411 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2412 {
2413 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2414 PGFileType de_type;
2415
2416 if (strcmp(replication_de->d_name, ".") == 0 ||
2417 strcmp(replication_de->d_name, "..") == 0)
2418 continue;
2419
2420 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2421 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2422
2423 /* we're only creating directories here, skip if it's not our's */
2424 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2425 continue;
2426
2427 /* we crashed while a slot was being setup or deleted, clean up */
2428 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2429 {
2430 if (!rmtree(path, true))
2431 {
2433 (errmsg("could not remove directory \"%s\"",
2434 path)));
2435 continue;
2436 }
2438 continue;
2439 }
2440
2441 /* looks like a slot in a normal state, restore */
2442 RestoreSlotFromDisk(replication_de->d_name);
2443 }
2444 FreeDir(replication_dir);
2445
2446 /* currently no slots exist, we're done. */
2448 return;
2449
2450 /* Now that we have recovered all the data, compute replication xmin */
2453}

References AllocateDir(), dirent::d_name, DEBUG1, elog, ereport, errmsg, FreeDir(), fsync_fname(), get_dirent_type(), max_repack_replication_slots, max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(SlotInvalidationCauses = =(RS_INVAL_MAX_CAUSES+1),
"array length mismatch"   
)

◆ validate_sync_standby_slots()

static bool validate_sync_standby_slots ( char *  rawname,
List **  elemlist 
)
static

Definition at line 2975 of file slot.c.

2976{
2977 /* Verify syntax and parse string into a list of identifiers */
2978 if (!SplitIdentifierString(rawname, ',', elemlist))
2979 {
2980 GUC_check_errdetail("List syntax is invalid.");
2981 return false;
2982 }
2983
2984 /* Iterate the list to validate each slot name */
2985 foreach_ptr(char, name, *elemlist)
2986 {
2987 int err_code;
2988 char *err_msg = NULL;
2989 char *err_hint = NULL;
2990
2991 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2992 &err_msg, &err_hint))
2993 {
2994 GUC_check_errcode(err_code);
2995 GUC_check_errdetail("%s", err_msg);
2996 if (err_hint != NULL)
2997 GUC_check_errhint("%s", err_hint);
2998 return false;
2999 }
3000 }
3001
3002 return true;
3003}

References foreach_ptr, GUC_check_errcode(), GUC_check_errdetail, GUC_check_errhint, name, ReplicationSlotValidateNameInternal(), and SplitIdentifierString().

Referenced by check_synchronized_standby_slots().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 3261 of file slot.c.

3262{
3263 /*
3264 * Don't need to wait for the standby to catch up if the current acquired
3265 * slot is not a logical failover slot, or there is no value in
3266 * synchronized_standby_slots.
3267 */
3269 return;
3270
3272
3273 for (;;)
3274 {
3276
3278 {
3279 ConfigReloadPending = false;
3281 }
3282
3283 /* Exit if done waiting for every slot. */
3284 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3285 break;
3286
3287 /*
3288 * Wait for the slots in the synchronized_standby_slots to catch up,
3289 * but use a timeout (1s) so we can also check if the
3290 * synchronized_standby_slots has been changed.
3291 */
3293 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3294 }
3295
3297}

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

int idle_replication_slot_timeout_secs = 0

◆ max_repack_replication_slots

◆ max_replication_slots

◆ MyReplicationSlot

ReplicationSlot* MyReplicationSlot = NULL

Definition at line 158 of file slot.c.

Referenced by abort_logical_decoding_activation(), ApplyLauncherMain(), binary_upgrade_check_logical_slot_pending_wal(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), DisableLogicalDecodingIfNecessary(), EnsureLogicalDecodingEnabled(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotCheckPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ ReplicationSlotsShmemCallbacks

const ShmemCallbacks ReplicationSlotsShmemCallbacks
Initial value:

Definition at line 152 of file slot.c.

152 {
153 .request_fn = ReplicationSlotsShmemRequest,
154 .init_fn = ReplicationSlotsShmemInit,
155};

◆ SlotInvalidationCauses

const SlotInvalidationCauseMap SlotInvalidationCauses[]
static
Initial value:
= {
{RS_INVAL_NONE, "none"},
{RS_INVAL_WAL_REMOVED, "wal_removed"},
{RS_INVAL_HORIZON, "rows_removed"},
{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
}

Definition at line 115 of file slot.c.

115 {
116 {RS_INVAL_NONE, "none"},
117 {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 {RS_INVAL_HORIZON, "rows_removed"},
119 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121};

Referenced by GetSlotInvalidationCause(), and GetSlotInvalidationCauseName().

◆ ss_oldest_flush_lsn

XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
static

Definition at line 185 of file slot.c.

Referenced by assign_synchronized_standby_slots(), and StandbySlotsHaveCaughtup().

◆ synchronized_standby_slots

char* synchronized_standby_slots

Definition at line 176 of file slot.c.

◆ synchronized_standby_slots_config