PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/subsystems.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
Include dependency graph for slot.c:

Go to the source code of this file.

Data Structures

struct  ReplicationSlotOnDisk
 
struct  SyncStandbySlotsConfigData
 
struct  SlotInvalidationCauseMap
 

Macros

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)
 
#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)
 
#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
 
#define ReplicationSlotOnDiskV2Size    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
#define SLOT_MAGIC   0x1051CA1 /* format identifier */
 
#define SLOT_VERSION   5 /* version for new files */
 

Typedefs

typedef struct ReplicationSlotOnDisk ReplicationSlotOnDisk
 
typedef struct SlotInvalidationCauseMap SlotInvalidationCauseMap
 

Functions

 StaticAssertDecl (lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
 
static void ReplicationSlotsShmemRequest (void *arg)
 
static void ReplicationSlotsShmemInit (void *arg)
 
static void ReplicationSlotShmemExit (int code, Datum arg)
 
static bool IsSlotForConflictCheck (const char *name)
 
static void ReplicationSlotDropPtr (ReplicationSlot *slot)
 
static void RestoreSlotFromDisk (const char *name)
 
static void CreateSlotOnDisk (ReplicationSlot *slot)
 
static void SaveSlotToPath (ReplicationSlot *slot, const char *dir, int elevel)
 
void ReplicationSlotInitialize (void)
 
bool ReplicationSlotValidateName (const char *name, bool allow_reserved_name, int elevel)
 
bool ReplicationSlotValidateNameInternal (const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
 
void ReplicationSlotCreate (const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
 
ReplicationSlotSearchNamedReplicationSlot (const char *name, bool need_lock)
 
int ReplicationSlotIndex (ReplicationSlot *slot)
 
bool ReplicationSlotName (int index, Name name)
 
void ReplicationSlotAcquire (const char *name, bool nowait, bool error_if_invalid)
 
void ReplicationSlotRelease (void)
 
void ReplicationSlotCleanup (bool synced_only)
 
void ReplicationSlotDrop (const char *name, bool nowait)
 
void ReplicationSlotAlter (const char *name, const bool *failover, const bool *two_phase)
 
void ReplicationSlotDropAcquired (bool try_disable)
 
void ReplicationSlotSave (void)
 
void ReplicationSlotMarkDirty (void)
 
void ReplicationSlotPersist (void)
 
void ReplicationSlotsComputeRequiredXmin (bool already_locked)
 
void ReplicationSlotsComputeRequiredLSN (void)
 
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN (void)
 
bool ReplicationSlotsCountDBSlots (Oid dboid, int *nslots, int *nactive)
 
void ReplicationSlotsDropDBSlots (Oid dboid)
 
bool CheckLogicalSlotExists (void)
 
void CheckSlotRequirements (bool repack)
 
void CheckSlotPermissions (void)
 
void ReplicationSlotReserveWal (void)
 
static void ReportSlotInvalidation (ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
 
static bool CanInvalidateIdleSlot (ReplicationSlot *s)
 
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
 
static bool InvalidatePossiblyObsoleteSlot (uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
 
bool InvalidateObsoleteReplicationSlots (uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
 
void CheckPointReplicationSlots (bool is_shutdown)
 
void StartupReplicationSlots (void)
 
ReplicationSlotInvalidationCause GetSlotInvalidationCause (const char *cause_name)
 
const charGetSlotInvalidationCauseName (ReplicationSlotInvalidationCause cause)
 
static bool validate_sync_standby_slots (char *rawname, List **elemlist)
 
bool check_synchronized_standby_slots (char **newval, void **extra, GucSource source)
 
void assign_synchronized_standby_slots (const char *newval, void *extra)
 
bool SlotExistsInSyncStandbySlots (const char *slot_name)
 
bool StandbySlotsHaveCaughtup (XLogRecPtr wait_for_lsn, int elevel)
 
void WaitForStandbyConfirmation (XLogRecPtr wait_for_lsn)
 

Variables

static const SlotInvalidationCauseMap SlotInvalidationCauses []
 
ReplicationSlotCtlDataReplicationSlotCtl = NULL
 
const ShmemCallbacks ReplicationSlotsShmemCallbacks
 
ReplicationSlotMyReplicationSlot = NULL
 
int max_replication_slots = 10
 
int max_repack_replication_slots = 5
 
int idle_replication_slot_timeout_secs = 0
 
charsynchronized_standby_slots
 
static SyncStandbySlotsConfigDatasynchronized_standby_slots_config
 
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
 

Macro Definition Documentation

◆ ReplicationSlotOnDiskChecksummedSize

#define ReplicationSlotOnDiskChecksummedSize    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize

Definition at line 137 of file slot.c.

152 {
153 .request_fn = ReplicationSlotsShmemRequest,
154 .init_fn = ReplicationSlotsShmemInit,
155};
156
157/* My backend's replication slot in the shared memory array */
159
160/* GUC variables */
161int max_replication_slots = 10; /* the maximum number of replication
162 * slots */
163int max_repack_replication_slots = 5; /* the maximum number of slots
164 * for REPACK */
165
166/*
167 * Invalidate replication slots that have remained idle longer than this
168 * duration; '0' disables it.
169 */
171
172/*
173 * This GUC lists streaming replication standby server slot names that
174 * logical WAL sender processes will wait for.
175 */
177
178/* This is the parsed and cached configuration for synchronized_standby_slots */
180
181/*
182 * Oldest LSN that has been confirmed to be flushed to the standbys
183 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 */
186
187static void ReplicationSlotShmemExit(int code, Datum arg);
188static bool IsSlotForConflictCheck(const char *name);
189static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190
191/* internal persistency functions */
192static void RestoreSlotFromDisk(const char *name);
193static void CreateSlotOnDisk(ReplicationSlot *slot);
194static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195
196/*
197 * Register shared memory space needed for replication slots.
198 */
199static void
201{
202 Size size;
203
205 return;
206
207 size = offsetof(ReplicationSlotCtlData, replication_slots);
208 size = add_size(size,
210 sizeof(ReplicationSlot)));
211 ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 .size = size,
213 .ptr = (void **) &ReplicationSlotCtl,
214 );
215}
216
217/*
218 * Initialize shared memory for replication slots.
219 */
220static void
222{
224 {
226
227 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
233 }
234}
235
236/*
237 * Register the callback for replication slot cleanup and releasing.
238 */
239void
241{
243}
244
245/*
246 * Release and cleanup replication slots.
247 */
248static void
250{
251 /* Make sure active replication slots are released */
252 if (MyReplicationSlot != NULL)
254
255 /* Also cleanup all the temporary slots. */
257}
258
259/*
260 * Check whether the passed slot name is valid and report errors at elevel.
261 *
262 * See comments for ReplicationSlotValidateNameInternal().
263 */
264bool
266 int elevel)
267{
268 int err_code;
269 char *err_msg = NULL;
270 char *err_hint = NULL;
271
273 &err_code, &err_msg, &err_hint))
274 {
275 /*
276 * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 * and errhint(), since the messages from
278 * ReplicationSlotValidateNameInternal() are already translated. This
279 * avoids double translation.
280 */
281 ereport(elevel,
283 errmsg_internal("%s", err_msg),
284 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285
286 pfree(err_msg);
287 if (err_hint != NULL)
289 return false;
290 }
291
292 return true;
293}
294
295/*
296 * Check whether the passed slot name is valid.
297 *
298 * An error will be reported for a reserved replication slot name if
299 * allow_reserved_name is set to false.
300 *
301 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 * the name to be used as a directory name on every supported OS.
303 *
304 * Returns true if the slot name is valid. Otherwise, returns false and stores
305 * the error code, error message, and optional hint in err_code, err_msg, and
306 * err_hint, respectively. The caller is responsible for freeing err_msg and
307 * err_hint, which are palloc'd.
308 */
309bool
311 int *err_code, char **err_msg, char **err_hint)
312{
313 const char *cp;
314
315 if (strlen(name) == 0)
316 {
318 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 *err_hint = NULL;
320 return false;
321 }
322
323 if (strlen(name) >= NAMEDATALEN)
324 {
326 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 *err_hint = NULL;
328 return false;
329 }
330
331 for (cp = name; *cp; cp++)
332 {
333 if (!((*cp >= 'a' && *cp <= 'z')
334 || (*cp >= '0' && *cp <= '9')
335 || (*cp == '_')))
336 {
338 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 return false;
341 }
342 }
343
345 {
347 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 return false;
351 }
352
353 return true;
354}
355
356/*
357 * Return true if the replication slot name is "pg_conflict_detection".
358 */
359static bool
360IsSlotForConflictCheck(const char *name)
361{
362 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363}
364
365/*
366 * Create a new replication slot and mark it as used by this backend.
367 *
368 * name: Name of the slot
369 * db_specific: logical decoding is db specific; if the slot is going to
370 * be used for that pass true, otherwise false.
371 * two_phase: If enabled, allows decoding of prepared transactions.
372 * repack: If true, use a slot from the pool for REPACK.
373 * failover: If enabled, allows the slot to be synced to standbys so
374 * that logical replication can be resumed after failover.
375 * synced: True if the slot is synchronized from the primary server.
376 */
377void
378ReplicationSlotCreate(const char *name, bool db_specific,
379 ReplicationSlotPersistency persistency,
380 bool two_phase, bool repack, bool failover, bool synced)
381{
382 ReplicationSlot *slot = NULL;
383 int startpoint,
384 endpoint;
385
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
431
432 /*
433 * Check for name collision (across the whole array), and identify an
434 * allocatable slot (in the array slice specific to our current use case:
435 * either general, or REPACK only). We need to hold
436 * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 * can change the in_use flags while we're looking at them.
438 */
440 startpoint = !repack ? 0 : max_replication_slots;
443 {
445
446 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
449 errmsg("replication slot \"%s\" already exists", name)));
450
451 if (i >= startpoint && i < endpoint &&
452 !s->in_use && slot == NULL)
453 slot = s;
454 }
456
457 /* If all slots are in use, we're out of luck. */
458 if (slot == NULL)
461 errmsg("all replication slots are in use"),
462 errhint("Free one or increase \"%s\".",
463 repack ? "max_repack_replication_slots" : "max_replication_slots")));
464
465 /*
466 * Since this slot is not in use, nobody should be looking at any part of
467 * it other than the in_use field unless they're trying to allocate it.
468 * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 * be doing that. So it's safe to initialize the slot.
470 */
471 Assert(!slot->in_use);
473
474 /* first initialize persistent data */
475 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 namestrcpy(&slot->data.name, name);
478 slot->data.persistency = persistency;
479 slot->data.two_phase = two_phase;
481 slot->data.failover = failover;
482 slot->data.synced = synced;
483
484 /* and then data only present in shared memory */
485 slot->just_dirtied = false;
486 slot->dirty = false;
495 slot->inactive_since = 0;
497
498 /*
499 * Create the slot on disk. We haven't actually marked the slot allocated
500 * yet, so no special cleanup is required if this errors out.
501 */
502 CreateSlotOnDisk(slot);
503
504 /*
505 * We need to briefly prevent any other backend from iterating over the
506 * slots while we flip the in_use flag. We also need to set the active
507 * flag while holding the ControlLock as otherwise a concurrent
508 * ReplicationSlotAcquire() could acquire the slot as well.
509 */
511
512 slot->in_use = true;
513
514 /* We can now mark the slot active, and that makes it our slot. */
515 SpinLockAcquire(&slot->mutex);
518 SpinLockRelease(&slot->mutex);
519 MyReplicationSlot = slot;
520
522
523 /*
524 * Create statistics entry for the new logical slot. We don't collect any
525 * stats for physical slots, so no need to create an entry for the same.
526 * See ReplicationSlotDropPtr for why we need to do this before releasing
527 * ReplicationSlotAllocationLock.
528 */
529 if (SlotIsLogical(slot))
531
532 /*
533 * Now that the slot has been marked as in_use and active, it's safe to
534 * let somebody else try to allocate a slot.
535 */
537
538 /* Let everybody know we've modified this slot */
540}
541
542/*
543 * Search for the named replication slot.
544 *
545 * Return the replication slot if found, otherwise NULL.
546 */
549{
550 int i;
551 ReplicationSlot *slot = NULL;
552
553 if (need_lock)
555
557 {
559
560 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 {
562 slot = s;
563 break;
564 }
565 }
566
567 if (need_lock)
569
570 return slot;
571}
572
573/*
574 * Return the index of the replication slot in
575 * ReplicationSlotCtl->replication_slots.
576 *
577 * This is mainly useful to have an efficient key for storing replication slot
578 * stats.
579 */
580int
582{
584 slot < ReplicationSlotCtl->replication_slots +
586
588}
589
590/*
591 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 * the slot's name and true is returned.
593 *
594 * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 * cases there are obvious TOCTOU issues.
596 */
597bool
599{
600 ReplicationSlot *slot;
601 bool found;
602
604
605 /*
606 * Ensure that the slot cannot be dropped while we copy the name. Don't
607 * need the spinlock as the name of an existing slot cannot change.
608 */
610 found = slot->in_use;
611 if (slot->in_use)
614
615 return found;
616}
617
618/*
619 * Find a previously created slot and mark it as used by this process.
620 *
621 * An error is raised if nowait is true and the slot is currently in use. If
622 * nowait is false, we sleep until the slot is released by the owning process.
623 *
624 * An error is raised if error_if_invalid is true and the slot is found to
625 * be invalid. It should always be set to true, except when we are temporarily
626 * acquiring the slot and don't intend to change it.
627 */
628void
629ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630{
632 ProcNumber active_proc;
633 int active_pid;
634
635 Assert(name != NULL);
636
637retry:
639
641
642 /* Check if the slot exists with the given name. */
644 if (s == NULL || !s->in_use)
645 {
647
650 errmsg("replication slot \"%s\" does not exist",
651 name)));
652 }
653
654 /*
655 * Do not allow users to acquire the reserved slot. This scenario may
656 * occur if the launcher that owns the slot has terminated unexpectedly
657 * due to an error, and a backend process attempts to reuse the slot.
658 */
662 errmsg("cannot acquire replication slot \"%s\"", name),
663 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664
665 /*
666 * This is the slot we want; check if it's active under some other
667 * process. In single user mode, we don't need this check.
668 */
670 {
671 /*
672 * Get ready to sleep on the slot in case it is active. (We may end
673 * up not sleeping, but we don't want to do this while holding the
674 * spinlock.)
675 */
676 if (!nowait)
678
679 /*
680 * It is important to reset the inactive_since under spinlock here to
681 * avoid race conditions with slot invalidation. See comments related
682 * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 */
687 active_proc = s->active_proc;
690 }
691 else
692 {
693 s->active_proc = active_proc = MyProcNumber;
695 }
696 active_pid = GetPGProcByNumber(active_proc)->pid;
698
699 /*
700 * If we found the slot but it's already active in another process, we
701 * wait until the owning process signals us that it's been released, or
702 * error out.
703 */
704 if (active_proc != MyProcNumber)
705 {
706 if (!nowait)
707 {
708 /* Wait here until we get signaled, and then restart */
712 goto retry;
713 }
714
717 errmsg("replication slot \"%s\" is active for PID %d",
718 NameStr(s->data.name), active_pid)));
719 }
720 else if (!nowait)
721 ConditionVariableCancelSleep(); /* no sleep needed after all */
722
723 /* We made this slot active, so it's ours now. */
725
726 /*
727 * We need to check for invalidation after making the slot ours to avoid
728 * the possible race condition with the checkpointer that can otherwise
729 * invalidate the slot immediately after the check.
730 */
734 errmsg("can no longer access replication slot \"%s\"",
735 NameStr(s->data.name)),
736 errdetail("This replication slot has been invalidated due to \"%s\".",
738
739 /* Let everybody know we've modified this slot */
741
742 /*
743 * The call to pgstat_acquire_replslot() protects against stats for a
744 * different slot, from before a restart or such, being present during
745 * pgstat_report_replslot().
746 */
747 if (SlotIsLogical(s))
749
750
751 if (am_walsender)
752 {
755 ? errmsg("acquired logical replication slot \"%s\"",
756 NameStr(s->data.name))
757 : errmsg("acquired physical replication slot \"%s\"",
758 NameStr(s->data.name)));
759 }
760}
761
762/*
763 * Release the replication slot that this backend considers to own.
764 *
765 * This or another backend can re-acquire the slot later.
766 * Resources this slot requires will be preserved.
767 */
768void
770{
772 char *slotname = NULL; /* keep compiler quiet */
773 bool is_logical;
774 TimestampTz now = 0;
775
776 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777
779
780 if (am_walsender)
781 slotname = pstrdup(NameStr(slot->data.name));
782
783 if (slot->data.persistency == RS_EPHEMERAL)
784 {
785 /*
786 * If slot is ephemeral, we drop it upon release, and request logical
787 * decoding be disabled.
788 */
790 }
791 else
792 {
793 /*
794 * If slot needed to temporarily restrain both data and catalog xmin
795 * to create the catalog snapshot, remove that temporary constraint.
796 * Snapshots can only be exported while the initial snapshot is still
797 * acquired.
798 */
799 if (!TransactionIdIsValid(slot->data.xmin) &&
801 {
802 SpinLockAcquire(&slot->mutex);
804 SpinLockRelease(&slot->mutex);
806 }
807
808 /*
809 * Set the time since the slot has become inactive. We get the current
810 * time beforehand to avoid system call while holding the spinlock.
811 */
813
814 if (slot->data.persistency == RS_PERSISTENT)
815 {
816 /*
817 * Mark persistent slot inactive. We're not freeing it, just
818 * disconnecting, but wake up others that may be waiting for it.
819 */
820 SpinLockAcquire(&slot->mutex);
823 SpinLockRelease(&slot->mutex);
825 }
826 else
828
830 }
831
832 /* might not have been set when we've been a plain slot */
837
838 if (am_walsender)
839 {
842 ? errmsg("released logical replication slot \"%s\"",
843 slotname)
844 : errmsg("released physical replication slot \"%s\"",
845 slotname));
846
847 pfree(slotname);
848 }
849}
850
851/*
852 * Cleanup temporary slots created in current session.
853 *
854 * Cleanup only synced temporary slots if 'synced_only' is true, else
855 * cleanup all temporary slots.
856 *
857 * If it drops the last logical slot in the cluster, requests to disable
858 * logical decoding.
859 */
860void
862{
863 int i;
865 bool dropped_logical = false;
866
868
869restart:
873 {
875
876 if (!s->in_use)
877 continue;
878
880
883
884 if ((s->active_proc == MyProcNumber &&
885 (!synced_only || s->data.synced)))
886 {
889 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890
891 if (SlotIsLogical(s))
892 dropped_logical = true;
893
895
897 goto restart;
898 }
899 else
901 }
902
904
907}
908
909/*
910 * Permanently drop the replication slot identified by the passed-in name.
911 *
912 * If this is a logical slot, request that logical decoding be disabled.
913 */
914void
915ReplicationSlotDrop(const char *name, bool nowait)
916{
917 ReplicationSlotAcquire(name, nowait, false);
918
919 /*
920 * Do not allow users to drop the slots which are currently being synced
921 * from the primary to the standby.
922 */
926 errmsg("cannot drop replication slot \"%s\"", name),
927 errdetail("This replication slot is being synchronized from the primary server."));
928
930}
931
932/*
933 * Change the definition of the slot identified by the specified name.
934 *
935 * Altering the two_phase property of a slot requires caution on the
936 * client-side. Enabling it at any random point during decoding has the
937 * risk that transactions prepared before this change may be skipped by
938 * the decoder, leading to missing prepare records on the client. So, we
939 * enable it for subscription related slots only once the initial tablesync
940 * is finished. See comments atop worker.c. Disabling it is safe only when
941 * there are no pending prepared transaction, otherwise, the changes of
942 * already prepared transactions can be replicated again along with their
943 * corresponding commit leading to duplicate data or errors.
944 */
945void
946ReplicationSlotAlter(const char *name, const bool *failover,
947 const bool *two_phase)
948{
949 bool update_slot = false;
950
953
954 ReplicationSlotAcquire(name, false, true);
955
959 errmsg("cannot use %s with a physical replication slot",
960 "ALTER_REPLICATION_SLOT"));
961
962 if (RecoveryInProgress())
963 {
964 /*
965 * Do not allow users to alter the slots which are currently being
966 * synced from the primary to the standby.
967 */
971 errmsg("cannot alter replication slot \"%s\"", name),
972 errdetail("This replication slot is being synchronized from the primary server."));
973
974 /*
975 * Do not allow users to enable failover on the standby as we do not
976 * support sync to the cascading standby.
977 */
978 if (failover && *failover)
981 errmsg("cannot enable failover for a replication slot"
982 " on the standby"));
983 }
984
985 if (failover)
986 {
987 /*
988 * Do not allow users to enable failover for temporary slots as we do
989 * not support syncing temporary slots to the standby.
990 */
994 errmsg("cannot enable failover for a temporary replication slot"));
995
997 {
1001
1002 update_slot = true;
1003 }
1004 }
1005
1007 {
1011
1012 update_slot = true;
1013 }
1014
1015 if (update_slot)
1016 {
1019 }
1020
1022}
1023
1024/*
1025 * Permanently drop the currently acquired replication slot.
1026 *
1027 * If caller requests it, have checkpointer attempt to disable logical
1028 * decoding. Obviously, this should only be done if the slot is logical.
1029 */
1030void
1032{
1033 ReplicationSlot *slot;
1034
1036 slot = MyReplicationSlot;
1037
1038 /* Can only disable logical decoding if slot is logical */
1039 Assert(!try_disable || SlotIsLogical(slot));
1040
1041 /* slot isn't acquired anymore */
1043
1045
1046 if (try_disable)
1048}
1049
1050/*
1051 * Permanently drop the replication slot which will be released by the point
1052 * this function returns.
1053 */
1054static void
1056{
1057 char path[MAXPGPATH];
1058 char tmppath[MAXPGPATH];
1059
1060 /*
1061 * If some other backend ran this code concurrently with us, we might try
1062 * to delete a slot with a certain name while someone else was trying to
1063 * create a slot with the same name.
1064 */
1066
1067 /* Generate pathnames. */
1068 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1069 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1070
1071 /*
1072 * Rename the slot directory on disk, so that we'll no longer recognize
1073 * this as a valid slot. Note that if this fails, we've got to mark the
1074 * slot inactive before bailing out. If we're dropping an ephemeral or a
1075 * temporary slot, we better never fail hard as the caller won't expect
1076 * the slot to survive and this might get called during error handling.
1077 */
1078 if (rename(path, tmppath) == 0)
1079 {
1080 /*
1081 * We need to fsync() the directory we just renamed and its parent to
1082 * make sure that our changes are on disk in a crash-safe fashion. If
1083 * fsync() fails, we can't be sure whether the changes are on disk or
1084 * not. For now, we handle that by panicking;
1085 * StartupReplicationSlots() will try to straighten it out after
1086 * restart.
1087 */
1089 fsync_fname(tmppath, true);
1092 }
1093 else
1094 {
1095 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1096
1097 SpinLockAcquire(&slot->mutex);
1099 SpinLockRelease(&slot->mutex);
1100
1101 /* wake up anyone waiting on this slot */
1103
1106 errmsg("could not rename file \"%s\" to \"%s\": %m",
1107 path, tmppath)));
1108 }
1109
1110 /*
1111 * The slot is definitely gone. Lock out concurrent scans of the array
1112 * long enough to kill it. It's OK to clear the active PID here without
1113 * grabbing the mutex because nobody else can be scanning the array here,
1114 * and nobody can be attached to this slot and thus access it without
1115 * scanning the array.
1116 *
1117 * Also wake up processes waiting for it.
1118 */
1121 slot->in_use = false;
1124
1125 /*
1126 * Slot is dead and doesn't prevent resource removal anymore, recompute
1127 * limits.
1128 */
1131
1132 /*
1133 * If removing the directory fails, the worst thing that will happen is
1134 * that the user won't be able to create a new slot with the same name
1135 * until the next server restart. We warn about it, but that's all.
1136 */
1137 if (!rmtree(tmppath, true))
1139 (errmsg("could not remove directory \"%s\"", tmppath)));
1140
1141 /*
1142 * Drop the statistics entry for the replication slot. Do this while
1143 * holding ReplicationSlotAllocationLock so that we don't drop a
1144 * statistics entry for another slot with the same name just created in
1145 * another session.
1146 */
1147 if (SlotIsLogical(slot))
1149
1150 /*
1151 * We release this at the very end, so that nobody starts trying to create
1152 * a slot while we're still cleaning up the detritus of the old one.
1153 */
1155}
1156
1157/*
1158 * Serialize the currently acquired slot's state from memory to disk, thereby
1159 * guaranteeing the current state will survive a crash.
1160 */
1161void
1163{
1164 char path[MAXPGPATH];
1165
1167
1170}
1171
1172/*
1173 * Signal that it would be useful if the currently acquired slot would be
1174 * flushed out to disk.
1175 *
1176 * Note that the actual flush to disk can be delayed for a long time, if
1177 * required for correctness explicitly do a ReplicationSlotSave().
1178 */
1179void
1181{
1183
1185
1186 SpinLockAcquire(&slot->mutex);
1188 MyReplicationSlot->dirty = true;
1189 SpinLockRelease(&slot->mutex);
1190}
1191
1192/*
1193 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1194 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1195 */
1196void
1198{
1200
1201 Assert(slot != NULL);
1203
1204 SpinLockAcquire(&slot->mutex);
1206 SpinLockRelease(&slot->mutex);
1207
1210}
1211
1212/*
1213 * Compute the oldest xmin across all slots and store it in the ProcArray.
1214 *
1215 * If already_locked is true, both the ReplicationSlotControlLock and the
1216 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1217 * caller first acquires the ReplicationSlotControlLock, followed by the
1218 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1219 * acquires them in that order.
1220 */
1221void
1223{
1224 int i;
1227
1232
1233 /*
1234 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1235 * values, so no backend updates the initial xmin for newly created slot
1236 * concurrently. A shared lock is used here to minimize lock contention,
1237 * especially when many slots exist and advancements occur frequently.
1238 * This is safe since an exclusive lock is taken during initial slot xmin
1239 * update in slot creation.
1240 *
1241 * One might think that we can hold the ProcArrayLock exclusively and
1242 * update the slot xmin values, but it could increase lock contention on
1243 * the ProcArrayLock, which is not great since this function can be called
1244 * at non-negligible frequency.
1245 *
1246 * Concurrent invocation of this function may cause the computed slot xmin
1247 * to regress. However, this is harmless because tuples prior to the most
1248 * recent xmin are no longer useful once advancement occurs (see
1249 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1250 * before updating the effective_xmin). Thus, such regression merely
1251 * prevents VACUUM from prematurely removing tuples without causing the
1252 * early deletion of required data.
1253 */
1254 if (!already_locked)
1256
1258 {
1260 TransactionId effective_xmin;
1261 TransactionId effective_catalog_xmin;
1262 bool invalidated;
1263
1264 if (!s->in_use)
1265 continue;
1266
1268 effective_xmin = s->effective_xmin;
1269 effective_catalog_xmin = s->effective_catalog_xmin;
1270 invalidated = s->data.invalidated != RS_INVAL_NONE;
1272
1273 /* invalidated slots need not apply */
1274 if (invalidated)
1275 continue;
1276
1277 /* check the data xmin */
1278 if (TransactionIdIsValid(effective_xmin) &&
1280 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1281 agg_xmin = effective_xmin;
1282
1283 /* check the catalog xmin */
1284 if (TransactionIdIsValid(effective_catalog_xmin) &&
1286 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1287 agg_catalog_xmin = effective_catalog_xmin;
1288 }
1289
1291
1292 if (!already_locked)
1294}
1295
1296/*
1297 * Compute the oldest restart LSN across all slots and inform xlog module.
1298 *
1299 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1300 * purpose, we don't try to account for that, because this module doesn't
1301 * know what to compare against.
1302 */
1303void
1305{
1306 int i;
1308
1310
1313 {
1315 XLogRecPtr restart_lsn;
1316 XLogRecPtr last_saved_restart_lsn;
1317 bool invalidated;
1318 ReplicationSlotPersistency persistency;
1319
1320 if (!s->in_use)
1321 continue;
1322
1324 persistency = s->data.persistency;
1325 restart_lsn = s->data.restart_lsn;
1326 invalidated = s->data.invalidated != RS_INVAL_NONE;
1327 last_saved_restart_lsn = s->last_saved_restart_lsn;
1329
1330 /* invalidated slots need not apply */
1331 if (invalidated)
1332 continue;
1333
1334 /*
1335 * For persistent slot use last_saved_restart_lsn to compute the
1336 * oldest LSN for removal of WAL segments. The segments between
1337 * last_saved_restart_lsn and restart_lsn might be needed by a
1338 * persistent slot in the case of database crash. Non-persistent
1339 * slots can't survive the database crash, so we don't care about
1340 * last_saved_restart_lsn for them.
1341 */
1342 if (persistency == RS_PERSISTENT)
1343 {
1344 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1345 restart_lsn > last_saved_restart_lsn)
1346 {
1347 restart_lsn = last_saved_restart_lsn;
1348 }
1349 }
1350
1351 if (XLogRecPtrIsValid(restart_lsn) &&
1353 restart_lsn < min_required))
1354 min_required = restart_lsn;
1355 }
1357
1359}
1360
1361/*
1362 * Compute the oldest WAL LSN required by *logical* decoding slots..
1363 *
1364 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1365 * slots exist.
1366 *
1367 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1368 * ignores physical replication slots.
1369 *
1370 * The results aren't required frequently, so we don't maintain a precomputed
1371 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1372 */
1375{
1377 int i;
1378
1380 return InvalidXLogRecPtr;
1381
1383
1385 {
1386 ReplicationSlot *s;
1387 XLogRecPtr restart_lsn;
1388 XLogRecPtr last_saved_restart_lsn;
1389 bool invalidated;
1390 ReplicationSlotPersistency persistency;
1391
1393
1394 /* cannot change while ReplicationSlotCtlLock is held */
1395 if (!s->in_use)
1396 continue;
1397
1398 /* we're only interested in logical slots */
1399 if (!SlotIsLogical(s))
1400 continue;
1401
1402 /* read once, it's ok if it increases while we're checking */
1404 persistency = s->data.persistency;
1405 restart_lsn = s->data.restart_lsn;
1406 invalidated = s->data.invalidated != RS_INVAL_NONE;
1407 last_saved_restart_lsn = s->last_saved_restart_lsn;
1409
1410 /* invalidated slots need not apply */
1411 if (invalidated)
1412 continue;
1413
1414 /*
1415 * For persistent slot use last_saved_restart_lsn to compute the
1416 * oldest LSN for removal of WAL segments. The segments between
1417 * last_saved_restart_lsn and restart_lsn might be needed by a
1418 * persistent slot in the case of database crash. Non-persistent
1419 * slots can't survive the database crash, so we don't care about
1420 * last_saved_restart_lsn for them.
1421 */
1422 if (persistency == RS_PERSISTENT)
1423 {
1424 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1425 restart_lsn > last_saved_restart_lsn)
1426 {
1427 restart_lsn = last_saved_restart_lsn;
1428 }
1429 }
1430
1431 if (!XLogRecPtrIsValid(restart_lsn))
1432 continue;
1433
1434 if (!XLogRecPtrIsValid(result) ||
1435 restart_lsn < result)
1436 result = restart_lsn;
1437 }
1438
1440
1441 return result;
1442}
1443
1444/*
1445 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1446 * passed database oid.
1447 *
1448 * Returns true if there are any slots referencing the database. *nslots will
1449 * be set to the absolute number of slots in the database, *nactive to ones
1450 * currently active.
1451 */
1452bool
1453ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1454{
1455 int i;
1456
1457 *nslots = *nactive = 0;
1458
1460 return false;
1461
1464 {
1465 ReplicationSlot *s;
1466
1468
1469 /* cannot change while ReplicationSlotCtlLock is held */
1470 if (!s->in_use)
1471 continue;
1472
1473 /* only logical slots are database specific, skip */
1474 if (!SlotIsLogical(s))
1475 continue;
1476
1477 /* not our database, skip */
1478 if (s->data.database != dboid)
1479 continue;
1480
1481 /* NB: intentionally counting invalidated slots */
1482
1483 /* count slots with spinlock held */
1485 (*nslots)++;
1487 (*nactive)++;
1489 }
1491
1492 if (*nslots > 0)
1493 return true;
1494 return false;
1495}
1496
1497/*
1498 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1499 * passed database oid. The caller should hold an exclusive lock on the
1500 * pg_database oid for the database to prevent creation of new slots on the db
1501 * or replay from existing slots.
1502 *
1503 * Another session that concurrently acquires an existing slot on the target DB
1504 * (most likely to drop it) may cause this function to ERROR. If that happens
1505 * it may have dropped some but not all slots.
1506 *
1507 * This routine isn't as efficient as it could be - but we don't drop
1508 * databases often, especially databases with lots of slots.
1509 *
1510 * If the last logical slot in the cluster is dropped, request to disable
1511 * logical decoding.
1512 */
1513void
1515{
1516 int i;
1518 bool dropped = false;
1519
1521 return;
1522
1523restart:
1527 {
1528 ReplicationSlot *s;
1529 char *slotname;
1530 ProcNumber active_proc;
1531
1533
1534 /* cannot change while ReplicationSlotCtlLock is held */
1535 if (!s->in_use)
1536 continue;
1537
1538 /* only logical slots are database specific, skip */
1539 if (!SlotIsLogical(s))
1540 continue;
1541
1542 /*
1543 * Check logical slots on other databases too so we can disable
1544 * logical decoding only if no slots in the cluster.
1545 */
1549
1550 /* not our database, skip */
1551 if (s->data.database != dboid)
1552 continue;
1553
1554 /* NB: intentionally including invalidated slots to drop */
1555
1556 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1558 /* can't change while ReplicationSlotControlLock is held */
1559 slotname = NameStr(s->data.name);
1560 active_proc = s->active_proc;
1561 if (active_proc == INVALID_PROC_NUMBER)
1562 {
1565 }
1567
1568 /*
1569 * Even though we hold an exclusive lock on the database object a
1570 * logical slot for that DB can still be active, e.g. if it's
1571 * concurrently being dropped by a backend connected to another DB.
1572 *
1573 * That's fairly unlikely in practice, so we'll just bail out.
1574 *
1575 * The slot sync worker holds a shared lock on the database before
1576 * operating on synced logical slots to avoid conflict with the drop
1577 * happening here. The persistent synced slots are thus safe but there
1578 * is a possibility that the slot sync worker has created a temporary
1579 * slot (which stays active even on release) and we are trying to drop
1580 * that here. In practice, the chances of hitting this scenario are
1581 * less as during slot synchronization, the temporary slot is
1582 * immediately converted to persistent and thus is safe due to the
1583 * shared lock taken on the database. So, we'll just bail out in such
1584 * a case.
1585 *
1586 * XXX: We can consider shutting down the slot sync worker before
1587 * trying to drop synced temporary slots here.
1588 */
1589 if (active_proc != INVALID_PROC_NUMBER)
1590 ereport(ERROR,
1592 errmsg("replication slot \"%s\" is active for PID %d",
1593 slotname, GetPGProcByNumber(active_proc)->pid)));
1594
1595 /*
1596 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1597 * holding ReplicationSlotControlLock over filesystem operations,
1598 * release ReplicationSlotControlLock and use
1599 * ReplicationSlotDropAcquired.
1600 *
1601 * As that means the set of slots could change, restart scan from the
1602 * beginning each time we release the lock.
1603 */
1606 dropped = true;
1607 goto restart;
1608 }
1610
1611 if (dropped && !found_valid_logicalslot)
1613}
1614
1615/*
1616 * Returns true if there is at least one in-use valid logical replication slot.
1617 */
1618bool
1620{
1621 bool found = false;
1622
1624 return false;
1625
1628 {
1629 ReplicationSlot *s;
1630 bool invalidated;
1631
1633
1634 /* cannot change while ReplicationSlotCtlLock is held */
1635 if (!s->in_use)
1636 continue;
1637
1638 if (SlotIsPhysical(s))
1639 continue;
1640
1642 invalidated = s->data.invalidated != RS_INVAL_NONE;
1644
1645 if (invalidated)
1646 continue;
1647
1648 found = true;
1649 break;
1650 }
1652
1653 return found;
1654}
1655
1656/*
1657 * Check whether the server's configuration supports using replication
1658 * slots.
1659 */
1660void
1662{
1663 /*
1664 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1665 * needs the same check.
1666 */
1667
1668 if (!repack && max_replication_slots == 0)
1669 ereport(ERROR,
1671 errmsg("replication slots can only be used if \"%s\" > 0",
1672 "max_replication_slots"));
1673
1675 ereport(ERROR,
1677 errmsg("REPACK can only be used if \"%s\" > 0",
1678 "max_repack_replication_slots"));
1679
1681 ereport(ERROR,
1683 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1684}
1685
1686/*
1687 * Check whether the user has privilege to use replication slots.
1688 */
1689void
1691{
1693 ereport(ERROR,
1695 errmsg("permission denied to use replication slots"),
1696 errdetail("Only roles with the %s attribute may use replication slots.",
1697 "REPLICATION")));
1698}
1699
1700/*
1701 * Reserve WAL for the currently active slot.
1702 *
1703 * Compute and set restart_lsn in a manner that's appropriate for the type of
1704 * the slot and concurrency safe.
1705 */
1706void
1708{
1710 XLogSegNo segno;
1711 XLogRecPtr restart_lsn;
1712
1713 Assert(slot != NULL);
1716
1717 /*
1718 * The replication slot mechanism is used to prevent the removal of
1719 * required WAL.
1720 *
1721 * Acquire an exclusive lock to prevent the checkpoint process from
1722 * concurrently computing the minimum slot LSN (see
1723 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1724 * replication cannot be removed during a checkpoint.
1725 *
1726 * The mechanism is reliable because if WAL reservation occurs first, the
1727 * checkpoint must wait for the restart_lsn update before determining the
1728 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1729 * first, subsequent WAL reservations will select positions at or beyond
1730 * the redo pointer of that checkpoint.
1731 */
1733
1734 /*
1735 * For logical slots log a standby snapshot and start logical decoding at
1736 * exactly that position. That allows the slot to start up more quickly.
1737 * But on a standby we cannot do WAL writes, so just use the replay
1738 * pointer; effectively, an attempt to create a logical slot on standby
1739 * will cause it to wait for an xl_running_xact record to be logged
1740 * independently on the primary, so that a snapshot can be built using the
1741 * record.
1742 *
1743 * None of this is needed (or indeed helpful) for physical slots as
1744 * they'll start replay at the last logged checkpoint anyway. Instead,
1745 * return the location of the last redo LSN, where a base backup has to
1746 * start replay at.
1747 */
1748 if (SlotIsPhysical(slot))
1749 restart_lsn = GetRedoRecPtr();
1750 else if (RecoveryInProgress())
1751 restart_lsn = GetXLogReplayRecPtr(NULL);
1752 else
1753 restart_lsn = GetXLogInsertRecPtr();
1754
1755 SpinLockAcquire(&slot->mutex);
1756 slot->data.restart_lsn = restart_lsn;
1757 SpinLockRelease(&slot->mutex);
1758
1759 /* prevent WAL removal as fast as possible */
1761
1762 /* Checkpoint shouldn't remove the required WAL. */
1764 if (XLogGetLastRemovedSegno() >= segno)
1765 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1766 NameStr(slot->data.name));
1767
1769
1770 if (!RecoveryInProgress() && SlotIsLogical(slot))
1771 {
1773
1774 /* make sure we have enough information to start */
1776
1777 /* and make sure it's fsynced to disk */
1779 }
1780}
1781
1782/*
1783 * Report that replication slot needs to be invalidated
1784 */
1785static void
1787 bool terminating,
1788 int pid,
1789 NameData slotname,
1790 XLogRecPtr restart_lsn,
1792 TransactionId snapshotConflictHorizon,
1793 long slot_idle_seconds)
1794{
1797
1800
1801 switch (cause)
1802 {
1804 {
1805 uint64 ex = oldestLSN - restart_lsn;
1806
1808 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1809 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1810 ex),
1811 LSN_FORMAT_ARGS(restart_lsn),
1812 ex);
1813 /* translator: %s is a GUC variable name */
1814 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1815 "max_slot_wal_keep_size");
1816 break;
1817 }
1818 case RS_INVAL_HORIZON:
1819 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1820 snapshotConflictHorizon);
1821 break;
1822
1823 case RS_INVAL_WAL_LEVEL:
1824 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1825 break;
1826
1828 {
1829 /* translator: %s is a GUC variable name */
1830 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1831 slot_idle_seconds, "idle_replication_slot_timeout",
1833 /* translator: %s is a GUC variable name */
1834 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1835 "idle_replication_slot_timeout");
1836 break;
1837 }
1838 case RS_INVAL_NONE:
1840 }
1841
1842 ereport(LOG,
1843 terminating ?
1844 errmsg("terminating process %d to release replication slot \"%s\"",
1845 pid, NameStr(slotname)) :
1846 errmsg("invalidating obsolete replication slot \"%s\"",
1847 NameStr(slotname)),
1849 err_hint.len ? errhint("%s", err_hint.data) : 0);
1850
1851 pfree(err_detail.data);
1852 pfree(err_hint.data);
1853}
1854
1855/*
1856 * Can we invalidate an idle replication slot?
1857 *
1858 * Idle timeout invalidation is allowed only when:
1859 *
1860 * 1. Idle timeout is set
1861 * 2. Slot has reserved WAL
1862 * 3. Slot is inactive
1863 * 4. The slot is not being synced from the primary while the server is in
1864 * recovery. This is because synced slots are always considered to be
1865 * inactive because they don't perform logical decoding to produce changes.
1866 */
1867static inline bool
1869{
1872 s->inactive_since > 0 &&
1873 !(RecoveryInProgress() && s->data.synced));
1874}
1875
1876/*
1877 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1878 * becomes invalid among the given possible causes.
1879 *
1880 * This function sequentially checks all possible invalidation causes and
1881 * returns the first one for which the slot is eligible for invalidation.
1882 */
1885 XLogRecPtr oldestLSN, Oid dboid,
1886 TransactionId snapshotConflictHorizon,
1887 TimestampTz *inactive_since, TimestampTz now)
1888{
1890
1892 {
1893 XLogRecPtr restart_lsn = s->data.restart_lsn;
1894
1895 if (XLogRecPtrIsValid(restart_lsn) &&
1896 restart_lsn < oldestLSN)
1897 return RS_INVAL_WAL_REMOVED;
1898 }
1899
1901 {
1902 /* invalid DB oid signals a shared relation */
1903 if (SlotIsLogical(s) &&
1904 (dboid == InvalidOid || dboid == s->data.database))
1905 {
1906 TransactionId effective_xmin = s->effective_xmin;
1908
1909 if (TransactionIdIsValid(effective_xmin) &&
1910 TransactionIdPrecedesOrEquals(effective_xmin,
1911 snapshotConflictHorizon))
1912 return RS_INVAL_HORIZON;
1915 snapshotConflictHorizon))
1916 return RS_INVAL_HORIZON;
1917 }
1918 }
1919
1921 {
1922 if (SlotIsLogical(s))
1923 return RS_INVAL_WAL_LEVEL;
1924 }
1925
1927 {
1928 Assert(now > 0);
1929
1930 if (CanInvalidateIdleSlot(s))
1931 {
1932 /*
1933 * Simulate the invalidation due to idle_timeout to test the
1934 * timeout behavior promptly, without waiting for it to trigger
1935 * naturally.
1936 */
1937#ifdef USE_INJECTION_POINTS
1938 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1939 {
1940 *inactive_since = 0; /* since the beginning of time */
1941 return RS_INVAL_IDLE_TIMEOUT;
1942 }
1943#endif
1944
1945 /*
1946 * Check if the slot needs to be invalidated due to
1947 * idle_replication_slot_timeout GUC.
1948 */
1951 {
1952 *inactive_since = s->inactive_since;
1953 return RS_INVAL_IDLE_TIMEOUT;
1954 }
1955 }
1956 }
1957
1958 return RS_INVAL_NONE;
1959}
1960
1961/*
1962 * Helper for InvalidateObsoleteReplicationSlots
1963 *
1964 * Acquires the given slot and mark it invalid, if necessary and possible.
1965 *
1966 * Returns true if the slot was invalidated.
1967 *
1968 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1969 * interim (and in that case we're not holding the lock at return, otherwise
1970 * we are).
1971 *
1972 * This is inherently racy, because we release the LWLock
1973 * for syscalls, so caller must restart if we return true.
1974 */
1975static bool
1977 ReplicationSlot *s,
1979 Oid dboid, TransactionId snapshotConflictHorizon,
1980 bool *released_lock_out)
1981{
1982 int last_signaled_pid = 0;
1983 bool released_lock = false;
1984 bool invalidated = false;
1985 TimestampTz inactive_since = 0;
1986
1987 for (;;)
1988 {
1989 XLogRecPtr restart_lsn;
1990 NameData slotname;
1991 ProcNumber active_proc;
1992 int active_pid = 0;
1994 TimestampTz now = 0;
1995 long slot_idle_secs = 0;
1996
1998
1999 if (!s->in_use)
2000 {
2001 if (released_lock)
2003 break;
2004 }
2005
2007 {
2008 /*
2009 * Assign the current time here to avoid system call overhead
2010 * while holding the spinlock in subsequent code.
2011 */
2013 }
2014
2015 /*
2016 * Check if the slot needs to be invalidated. If it needs to be
2017 * invalidated, and is not currently acquired, acquire it and mark it
2018 * as having been invalidated. We do this with the spinlock held to
2019 * avoid race conditions -- for example the restart_lsn could move
2020 * forward, or the slot could be dropped.
2021 */
2023
2024 restart_lsn = s->data.restart_lsn;
2025
2026 /* we do nothing if the slot is already invalid */
2027 if (s->data.invalidated == RS_INVAL_NONE)
2029 s, oldestLSN,
2030 dboid,
2031 snapshotConflictHorizon,
2032 &inactive_since,
2033 now);
2034
2035 /* if there's no invalidation, we're done */
2037 {
2039 if (released_lock)
2041 break;
2042 }
2043
2044 slotname = s->data.name;
2045 active_proc = s->active_proc;
2046
2047 /*
2048 * If the slot can be acquired, do so and mark it invalidated
2049 * immediately. Otherwise we'll signal the owning process, below, and
2050 * retry.
2051 *
2052 * Note: Unlike other slot attributes, slot's inactive_since can't be
2053 * changed until the acquired slot is released or the owning process
2054 * is terminated. So, the inactive slot can only be invalidated
2055 * immediately without being terminated.
2056 */
2057 if (active_proc == INVALID_PROC_NUMBER)
2058 {
2062
2063 /*
2064 * XXX: We should consider not overwriting restart_lsn and instead
2065 * just rely on .invalidated.
2066 */
2068 {
2071 }
2072
2073 /* Let caller know */
2074 invalidated = true;
2075 }
2076 else
2077 {
2078 active_pid = GetPGProcByNumber(active_proc)->pid;
2079 Assert(active_pid != 0);
2080 }
2081
2083
2084 /*
2085 * Calculate the idle time duration of the slot if slot is marked
2086 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2087 */
2089 {
2090 int slot_idle_usecs;
2091
2092 TimestampDifference(inactive_since, now, &slot_idle_secs,
2094 }
2095
2096 if (active_proc != INVALID_PROC_NUMBER)
2097 {
2098 /*
2099 * Prepare the sleep on the slot's condition variable before
2100 * releasing the lock, to close a possible race condition if the
2101 * slot is released before the sleep below.
2102 */
2104
2106 released_lock = true;
2107
2108 /*
2109 * Signal to terminate the process that owns the slot, if we
2110 * haven't already signalled it. (Avoidance of repeated
2111 * signalling is the only reason for there to be a loop in this
2112 * routine; otherwise we could rely on caller's restart loop.)
2113 *
2114 * There is the race condition that other process may own the slot
2115 * after its current owner process is terminated and before this
2116 * process owns it. To handle that, we signal only if the PID of
2117 * the owning process has changed from the previous time. (This
2118 * logic assumes that the same PID is not reused very quickly.)
2119 */
2121 {
2123 slotname, restart_lsn,
2124 oldestLSN, snapshotConflictHorizon,
2126
2127 if (MyBackendType == B_STARTUP)
2129 active_pid,
2131 else
2132 (void) kill(active_pid, SIGTERM);
2133
2135 }
2136
2137 /* Wait until the slot is released. */
2140
2141 /*
2142 * Re-acquire lock and start over; we expect to invalidate the
2143 * slot next time (unless another process acquires the slot in the
2144 * meantime).
2145 *
2146 * Note: It is possible for a slot to advance its restart_lsn or
2147 * xmin values sufficiently between when we release the mutex and
2148 * when we recheck, moving from a conflicting state to a non
2149 * conflicting state. This is intentional and safe: if the slot
2150 * has caught up while we're busy here, the resources we were
2151 * concerned about (WAL segments or tuples) have not yet been
2152 * removed, and there's no reason to invalidate the slot.
2153 */
2155 continue;
2156 }
2157 else
2158 {
2159 /*
2160 * We hold the slot now and have already invalidated it; flush it
2161 * to ensure that state persists.
2162 *
2163 * Don't want to hold ReplicationSlotControlLock across file
2164 * system operations, so release it now but be sure to tell caller
2165 * to restart from scratch.
2166 */
2168 released_lock = true;
2169
2170 /* Make sure the invalidated state persists across server restart */
2174
2176 slotname, restart_lsn,
2177 oldestLSN, snapshotConflictHorizon,
2179
2180 /* done with this slot for now */
2181 break;
2182 }
2183 }
2184
2186
2188 return invalidated;
2189}
2190
2191/*
2192 * Invalidate slots that require resources about to be removed.
2193 *
2194 * Returns true when any slot have got invalidated.
2195 *
2196 * Whether a slot needs to be invalidated depends on the invalidation cause.
2197 * A slot is invalidated if it:
2198 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2199 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2200 * db; dboid may be InvalidOid for shared relations
2201 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2202 * logical.
2203 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2204 * "idle_replication_slot_timeout" duration.
2205 *
2206 * Note: This function attempts to invalidate the slot for multiple possible
2207 * causes in a single pass, minimizing redundant iterations. The "cause"
2208 * parameter can be a MASK representing one or more of the defined causes.
2209 *
2210 * If it invalidates the last logical slot in the cluster, it requests to
2211 * disable logical decoding.
2212 *
2213 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2214 */
2215bool
2217 XLogSegNo oldestSegno, Oid dboid,
2218 TransactionId snapshotConflictHorizon)
2219{
2221 bool invalidated = false;
2222 bool invalidated_logical = false;
2224
2225 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2228
2230 return invalidated;
2231
2233
2234restart:
2238 {
2240 bool released_lock = false;
2241
2242 if (!s->in_use)
2243 continue;
2244
2245 /* Prevent invalidation of logical slots during binary upgrade */
2247 {
2251
2252 continue;
2253 }
2254
2256 dboid, snapshotConflictHorizon,
2257 &released_lock))
2258 {
2260
2261 /* Remember we have invalidated a physical or logical slot */
2262 invalidated = true;
2263
2264 /*
2265 * Additionally, remember we have invalidated a logical slot as we
2266 * can request disabling logical decoding later.
2267 */
2268 if (SlotIsLogical(s))
2269 invalidated_logical = true;
2270 }
2271 else
2272 {
2273 /*
2274 * We need to check if the slot is invalidated here since
2275 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2276 * is already invalidated.
2277 */
2282 }
2283
2284 /* if the lock was released, start from scratch */
2285 if (released_lock)
2286 goto restart;
2287 }
2289
2290 /*
2291 * If any slots have been invalidated, recalculate the resource limits.
2292 */
2293 if (invalidated)
2294 {
2297 }
2298
2299 /*
2300 * Request the checkpointer to disable logical decoding if no valid
2301 * logical slots remain. If called by the checkpointer during a
2302 * checkpoint, only the request is initiated; actual deactivation is
2303 * deferred until after the checkpoint completes.
2304 */
2307
2308 return invalidated;
2309}
2310
2311/*
2312 * Flush all replication slots to disk.
2313 *
2314 * It is convenient to flush dirty replication slots at the time of checkpoint.
2315 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2316 * for which the confirmed_flush LSN has been updated since the last time it
2317 * was saved and flush them.
2318 */
2319void
2320CheckPointReplicationSlots(bool is_shutdown)
2321{
2322 int i;
2323 bool last_saved_restart_lsn_updated = false;
2324
2325 elog(DEBUG1, "performing replication slot checkpoint");
2326
2327 /*
2328 * Prevent any slot from being created/dropped while we're active. As we
2329 * explicitly do *not* want to block iterating over replication_slots or
2330 * acquiring a slot we cannot take the control lock - but that's OK,
2331 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2332 * enough to guarantee that nobody can change the in_use bits on us.
2333 *
2334 * Additionally, acquiring the Allocation lock is necessary to serialize
2335 * the slot flush process with concurrent slot WAL reservation. This
2336 * ensures that the WAL position being reserved is either flushed to disk
2337 * or is beyond or equal to the redo pointer of the current checkpoint
2338 * (See ReplicationSlotReserveWal for details).
2339 */
2341
2343 {
2345 char path[MAXPGPATH];
2346
2347 if (!s->in_use)
2348 continue;
2349
2350 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2351 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2352
2353 /*
2354 * Slot's data is not flushed each time the confirmed_flush LSN is
2355 * updated as that could lead to frequent writes. However, we decide
2356 * to force a flush of all logical slot's data at the time of shutdown
2357 * if the confirmed_flush LSN is changed since we last flushed it to
2358 * disk. This helps in avoiding an unnecessary retreat of the
2359 * confirmed_flush LSN after restart.
2360 */
2361 if (is_shutdown && SlotIsLogical(s))
2362 {
2364
2365 if (s->data.invalidated == RS_INVAL_NONE &&
2367 {
2368 s->just_dirtied = true;
2369 s->dirty = true;
2370 }
2372 }
2373
2374 /*
2375 * Track if we're going to update slot's last_saved_restart_lsn. We
2376 * need this to know if we need to recompute the required LSN.
2377 */
2380
2381 SaveSlotToPath(s, path, LOG);
2382 }
2384
2385 /*
2386 * Recompute the required LSN if SaveSlotToPath() updated
2387 * last_saved_restart_lsn for any slot.
2388 */
2391}
2392
2393/*
2394 * Load all replication slots from disk into memory at server startup. This
2395 * needs to be run before we start crash recovery.
2396 */
2397void
2399{
2401 struct dirent *replication_de;
2402
2403 elog(DEBUG1, "starting up replication slots");
2404
2405 /* restore all slots by iterating over all on-disk entries */
2408 {
2409 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2411
2412 if (strcmp(replication_de->d_name, ".") == 0 ||
2413 strcmp(replication_de->d_name, "..") == 0)
2414 continue;
2415
2416 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2418
2419 /* we're only creating directories here, skip if it's not our's */
2421 continue;
2422
2423 /* we crashed while a slot was being setup or deleted, clean up */
2424 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2425 {
2426 if (!rmtree(path, true))
2427 {
2429 (errmsg("could not remove directory \"%s\"",
2430 path)));
2431 continue;
2432 }
2434 continue;
2435 }
2436
2437 /* looks like a slot in a normal state, restore */
2439 }
2441
2442 /* currently no slots exist, we're done. */
2444 return;
2445
2446 /* Now that we have recovered all the data, compute replication xmin */
2449}
2450
2451/* ----
2452 * Manipulation of on-disk state of replication slots
2453 *
2454 * NB: none of the routines below should take any notice whether a slot is the
2455 * current one or not, that's all handled a layer above.
2456 * ----
2457 */
2458static void
2460{
2461 char tmppath[MAXPGPATH];
2462 char path[MAXPGPATH];
2463 struct stat st;
2464
2465 /*
2466 * No need to take out the io_in_progress_lock, nobody else can see this
2467 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2468 * takes out the lock, if we'd take the lock here, we'd deadlock.
2469 */
2470
2471 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2472 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2473
2474 /*
2475 * It's just barely possible that some previous effort to create or drop a
2476 * slot with this name left a temp directory lying around. If that seems
2477 * to be the case, try to remove it. If the rmtree() fails, we'll error
2478 * out at the MakePGDirectory() below, so we don't bother checking
2479 * success.
2480 */
2481 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2482 rmtree(tmppath, true);
2483
2484 /* Create and fsync the temporary slot directory. */
2485 if (MakePGDirectory(tmppath) < 0)
2486 ereport(ERROR,
2488 errmsg("could not create directory \"%s\": %m",
2489 tmppath)));
2490 fsync_fname(tmppath, true);
2491
2492 /* Write the actual state file. */
2493 slot->dirty = true; /* signal that we really need to write */
2495
2496 /* Rename the directory into place. */
2497 if (rename(tmppath, path) != 0)
2498 ereport(ERROR,
2500 errmsg("could not rename file \"%s\" to \"%s\": %m",
2501 tmppath, path)));
2502
2503 /*
2504 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2505 * would persist after an OS crash or not - so, force a restart. The
2506 * restart would try to fsync this again till it works.
2507 */
2509
2510 fsync_fname(path, true);
2512
2514}
2515
2516/*
2517 * Shared functionality between saving and creating a replication slot.
2518 */
2519static void
2520SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2521{
2522 char tmppath[MAXPGPATH];
2523 char path[MAXPGPATH];
2524 int fd;
2526 bool was_dirty;
2527
2528 /* first check whether there's something to write out */
2529 SpinLockAcquire(&slot->mutex);
2530 was_dirty = slot->dirty;
2531 slot->just_dirtied = false;
2532 SpinLockRelease(&slot->mutex);
2533
2534 /* and don't do anything if there's nothing to write */
2535 if (!was_dirty)
2536 return;
2537
2539
2540 /* silence valgrind :( */
2541 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2542
2543 sprintf(tmppath, "%s/state.tmp", dir);
2544 sprintf(path, "%s/state", dir);
2545
2547 if (fd < 0)
2548 {
2549 /*
2550 * If not an ERROR, then release the lock before returning. In case
2551 * of an ERROR, the error recovery path automatically releases the
2552 * lock, but no harm in explicitly releasing even in that case. Note
2553 * that LWLockRelease() could affect errno.
2554 */
2555 int save_errno = errno;
2556
2558 errno = save_errno;
2559 ereport(elevel,
2561 errmsg("could not create file \"%s\": %m",
2562 tmppath)));
2563 return;
2564 }
2565
2566 cp.magic = SLOT_MAGIC;
2567 INIT_CRC32C(cp.checksum);
2568 cp.version = SLOT_VERSION;
2570
2571 SpinLockAcquire(&slot->mutex);
2572
2573 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2574
2575 SpinLockRelease(&slot->mutex);
2576
2577 COMP_CRC32C(cp.checksum,
2580 FIN_CRC32C(cp.checksum);
2581
2582 errno = 0;
2584 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2585 {
2586 int save_errno = errno;
2587
2590 unlink(tmppath);
2592
2593 /* if write didn't set errno, assume problem is no disk space */
2595 ereport(elevel,
2597 errmsg("could not write to file \"%s\": %m",
2598 tmppath)));
2599 return;
2600 }
2602
2603 /* fsync the temporary file */
2605 if (pg_fsync(fd) != 0)
2606 {
2607 int save_errno = errno;
2608
2611 unlink(tmppath);
2613
2614 errno = save_errno;
2615 ereport(elevel,
2617 errmsg("could not fsync file \"%s\": %m",
2618 tmppath)));
2619 return;
2620 }
2622
2623 if (CloseTransientFile(fd) != 0)
2624 {
2625 int save_errno = errno;
2626
2627 unlink(tmppath);
2629
2630 errno = save_errno;
2631 ereport(elevel,
2633 errmsg("could not close file \"%s\": %m",
2634 tmppath)));
2635 return;
2636 }
2637
2638 /* rename to permanent file, fsync file and directory */
2639 if (rename(tmppath, path) != 0)
2640 {
2641 int save_errno = errno;
2642
2643 unlink(tmppath);
2645
2646 errno = save_errno;
2647 ereport(elevel,
2649 errmsg("could not rename file \"%s\" to \"%s\": %m",
2650 tmppath, path)));
2651 return;
2652 }
2653
2654 /*
2655 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2656 */
2658
2659 fsync_fname(path, false);
2660 fsync_fname(dir, true);
2662
2664
2665 /*
2666 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2667 * already and remember the confirmed_flush LSN value.
2668 */
2669 SpinLockAcquire(&slot->mutex);
2670 if (!slot->just_dirtied)
2671 slot->dirty = false;
2672 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2673 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2674 SpinLockRelease(&slot->mutex);
2675
2677}
2678
2679/*
2680 * Load a single slot from disk into memory.
2681 */
2682static void
2683RestoreSlotFromDisk(const char *name)
2684{
2686 int i;
2687 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2688 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2689 int fd;
2690 bool restored = false;
2691 int readBytes;
2692 pg_crc32c checksum;
2693 TimestampTz now = 0;
2694
2695 /* no need to lock here, no concurrent access allowed yet */
2696
2697 /* delete temp file if it exists */
2698 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2699 sprintf(path, "%s/state.tmp", slotdir);
2700 if (unlink(path) < 0 && errno != ENOENT)
2701 ereport(PANIC,
2703 errmsg("could not remove file \"%s\": %m", path)));
2704
2705 sprintf(path, "%s/state", slotdir);
2706
2707 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2708
2709 /* on some operating systems fsyncing a file requires O_RDWR */
2711
2712 /*
2713 * We do not need to handle this as we are rename()ing the directory into
2714 * place only after we fsync()ed the state file.
2715 */
2716 if (fd < 0)
2717 ereport(PANIC,
2719 errmsg("could not open file \"%s\": %m", path)));
2720
2721 /*
2722 * Sync state file before we're reading from it. We might have crashed
2723 * while it wasn't synced yet and we shouldn't continue on that basis.
2724 */
2726 if (pg_fsync(fd) != 0)
2727 ereport(PANIC,
2729 errmsg("could not fsync file \"%s\": %m",
2730 path)));
2732
2733 /* Also sync the parent directory */
2735 fsync_fname(slotdir, true);
2737
2738 /* read part of statefile that's guaranteed to be version independent */
2743 {
2744 if (readBytes < 0)
2745 ereport(PANIC,
2747 errmsg("could not read file \"%s\": %m", path)));
2748 else
2749 ereport(PANIC,
2751 errmsg("could not read file \"%s\": read %d of %zu",
2752 path, readBytes,
2754 }
2755
2756 /* verify magic */
2757 if (cp.magic != SLOT_MAGIC)
2758 ereport(PANIC,
2760 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2761 path, cp.magic, SLOT_MAGIC)));
2762
2763 /* verify version */
2764 if (cp.version != SLOT_VERSION)
2765 ereport(PANIC,
2767 errmsg("replication slot file \"%s\" has unsupported version %u",
2768 path, cp.version)));
2769
2770 /* boundary check on length */
2771 if (cp.length != ReplicationSlotOnDiskV2Size)
2772 ereport(PANIC,
2774 errmsg("replication slot file \"%s\" has corrupted length %u",
2775 path, cp.length)));
2776
2777 /* Now that we know the size, read the entire file */
2779 readBytes = read(fd,
2781 cp.length);
2783 if (readBytes != cp.length)
2784 {
2785 if (readBytes < 0)
2786 ereport(PANIC,
2788 errmsg("could not read file \"%s\": %m", path)));
2789 else
2790 ereport(PANIC,
2792 errmsg("could not read file \"%s\": read %d of %zu",
2793 path, readBytes, (Size) cp.length)));
2794 }
2795
2796 if (CloseTransientFile(fd) != 0)
2797 ereport(PANIC,
2799 errmsg("could not close file \"%s\": %m", path)));
2800
2801 /* now verify the CRC */
2802 INIT_CRC32C(checksum);
2803 COMP_CRC32C(checksum,
2806 FIN_CRC32C(checksum);
2807
2808 if (!EQ_CRC32C(checksum, cp.checksum))
2809 ereport(PANIC,
2810 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2811 path, checksum, cp.checksum)));
2812
2813 /*
2814 * If we crashed with an ephemeral slot active, don't restore but delete
2815 * it.
2816 */
2817 if (cp.slotdata.persistency != RS_PERSISTENT)
2818 {
2819 if (!rmtree(slotdir, true))
2820 {
2822 (errmsg("could not remove directory \"%s\"",
2823 slotdir)));
2824 }
2826 return;
2827 }
2828
2829 /*
2830 * Verify that requirements for the specific slot type are met. That's
2831 * important because if these aren't met we're not guaranteed to retain
2832 * all the necessary resources for the slot.
2833 *
2834 * NB: We have to do so *after* the above checks for ephemeral slots,
2835 * because otherwise a slot that shouldn't exist anymore could prevent
2836 * restarts.
2837 *
2838 * NB: Changing the requirements here also requires adapting
2839 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2840 */
2841 if (cp.slotdata.database != InvalidOid)
2842 {
2844 ereport(FATAL,
2846 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2847 NameStr(cp.slotdata.name)),
2848 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2849
2850 /*
2851 * In standby mode, the hot standby must be enabled. This check is
2852 * necessary to ensure logical slots are invalidated when they become
2853 * incompatible due to insufficient wal_level. Otherwise, if the
2854 * primary reduces effective_wal_level < logical while hot standby is
2855 * disabled, primary disable logical decoding while hot standby is
2856 * disabled, logical slots would remain valid even after promotion.
2857 */
2859 ereport(FATAL,
2861 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2862 NameStr(cp.slotdata.name)),
2863 errhint("Change \"hot_standby\" to be \"on\".")));
2864 }
2865 else if (wal_level < WAL_LEVEL_REPLICA)
2866 ereport(FATAL,
2868 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2869 NameStr(cp.slotdata.name)),
2870 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2871
2872 /*
2873 * Nothing can be active yet, don't lock anything. Note we iterate up to
2874 * max_replication_slots instead of adding max_repack_replication_slots as
2875 * in all other places, because we must enforce the GUC value in case
2876 * there were more slots before the shutdown than what it is set up to
2877 * now.
2878 */
2879 for (i = 0; i < max_replication_slots; i++)
2880 {
2881 ReplicationSlot *slot;
2882
2884
2885 if (slot->in_use)
2886 continue;
2887
2888 /* restore the entire set of persistent data */
2889 memcpy(&slot->data, &cp.slotdata,
2891
2892 /* initialize in memory state */
2893 slot->effective_xmin = cp.slotdata.xmin;
2894 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2895 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2896 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2897
2902
2903 slot->in_use = true;
2905
2906 /*
2907 * Set the time since the slot has become inactive after loading the
2908 * slot from the disk into memory. Whoever acquires the slot i.e.
2909 * makes the slot active will reset it. Use the same inactive_since
2910 * time for all the slots.
2911 */
2912 if (now == 0)
2914
2916
2917 restored = true;
2918 break;
2919 }
2920
2921 if (!restored)
2922 ereport(FATAL,
2923 (errmsg("too many replication slots active before shutdown"),
2924 errhint("Increase \"max_replication_slots\" and try again.")));
2925}
2926
2927/*
2928 * Maps an invalidation reason for a replication slot to
2929 * ReplicationSlotInvalidationCause.
2930 */
2932GetSlotInvalidationCause(const char *cause_name)
2933{
2934 Assert(cause_name);
2935
2936 /* Search lookup table for the cause having this name */
2937 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2938 {
2939 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2941 }
2942
2943 Assert(false);
2944 return RS_INVAL_NONE; /* to keep compiler quiet */
2945}
2946
2947/*
2948 * Maps a ReplicationSlotInvalidationCause to the invalidation
2949 * reason for a replication slot.
2950 */
2951const char *
2953{
2954 /* Search lookup table for the name of this cause */
2955 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2956 {
2957 if (SlotInvalidationCauses[i].cause == cause)
2959 }
2960
2961 Assert(false);
2962 return "none"; /* to keep compiler quiet */
2963}
2964
2965/*
2966 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2967 *
2968 * The rawname will be parsed, and the result will be saved into *elemlist.
2969 */
2970static bool
2972{
2973 /* Verify syntax and parse string into a list of identifiers */
2975 {
2976 GUC_check_errdetail("List syntax is invalid.");
2977 return false;
2978 }
2979
2980 /* Iterate the list to validate each slot name */
2981 foreach_ptr(char, name, *elemlist)
2982 {
2983 int err_code;
2984 char *err_msg = NULL;
2985 char *err_hint = NULL;
2986
2988 &err_msg, &err_hint))
2989 {
2991 GUC_check_errdetail("%s", err_msg);
2992 if (err_hint != NULL)
2994 return false;
2995 }
2996 }
2997
2998 return true;
2999}
3000
3001/*
3002 * GUC check_hook for synchronized_standby_slots
3003 */
3004bool
3006{
3007 char *rawname;
3008 char *ptr;
3009 List *elemlist;
3010 int size;
3011 bool ok;
3013
3014 if ((*newval)[0] == '\0')
3015 return true;
3016
3017 /* Need a modifiable copy of the GUC string */
3019
3020 /* Now verify if the specified slots exist and have correct type */
3022
3023 if (!ok || elemlist == NIL)
3024 {
3025 pfree(rawname);
3027 return ok;
3028 }
3029
3030 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3031 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3032 foreach_ptr(char, slot_name, elemlist)
3033 size += strlen(slot_name) + 1;
3034
3035 /* GUC extra value must be guc_malloc'd, not palloc'd */
3036 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3037 if (!config)
3038 return false;
3039
3040 /* Transform the data into SyncStandbySlotsConfigData */
3041 config->nslotnames = list_length(elemlist);
3042
3043 ptr = config->slot_names;
3044 foreach_ptr(char, slot_name, elemlist)
3045 {
3046 strcpy(ptr, slot_name);
3047 ptr += strlen(slot_name) + 1;
3048 }
3049
3050 *extra = config;
3051
3052 pfree(rawname);
3054 return true;
3055}
3056
3057/*
3058 * GUC assign_hook for synchronized_standby_slots
3059 */
3060void
3061assign_synchronized_standby_slots(const char *newval, void *extra)
3062{
3063 /*
3064 * The standby slots may have changed, so we must recompute the oldest
3065 * LSN.
3066 */
3068
3070}
3071
3072/*
3073 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3074 */
3075bool
3076SlotExistsInSyncStandbySlots(const char *slot_name)
3077{
3078 const char *standby_slot_name;
3079
3080 /* Return false if there is no value in synchronized_standby_slots */
3082 return false;
3083
3084 /*
3085 * XXX: We are not expecting this list to be long so a linear search
3086 * shouldn't hurt but if that turns out not to be true then we can cache
3087 * this information for each WalSender as well.
3088 */
3090 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3091 {
3092 if (strcmp(standby_slot_name, slot_name) == 0)
3093 return true;
3094
3096 }
3097
3098 return false;
3099}
3100
3101/*
3102 * Return true if the slots specified in synchronized_standby_slots have caught up to
3103 * the given WAL location, false otherwise.
3104 *
3105 * The elevel parameter specifies the error level used for logging messages
3106 * related to slots that do not exist, are invalidated, or are inactive.
3107 */
3108bool
3110{
3111 const char *name;
3112 int caught_up_slot_num = 0;
3114
3115 /*
3116 * Don't need to wait for the standbys to catch up if there is no value in
3117 * synchronized_standby_slots.
3118 */
3120 return true;
3121
3122 /*
3123 * Don't need to wait for the standbys to catch up if we are on a standby
3124 * server, since we do not support syncing slots to cascading standbys.
3125 */
3126 if (RecoveryInProgress())
3127 return true;
3128
3129 /*
3130 * Don't need to wait for the standbys to catch up if they are already
3131 * beyond the specified WAL location.
3132 */
3135 return true;
3136
3137 /*
3138 * To prevent concurrent slot dropping and creation while filtering the
3139 * slots, take the ReplicationSlotControlLock outside of the loop.
3140 */
3142
3144 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3145 {
3146 XLogRecPtr restart_lsn;
3147 bool invalidated;
3148 bool inactive;
3149 ReplicationSlot *slot;
3150
3151 slot = SearchNamedReplicationSlot(name, false);
3152
3153 /*
3154 * If a slot name provided in synchronized_standby_slots does not
3155 * exist, report a message and exit the loop.
3156 */
3157 if (!slot)
3158 {
3159 ereport(elevel,
3161 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3162 name, "synchronized_standby_slots"),
3163 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3164 name),
3165 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3166 name, "synchronized_standby_slots"));
3167 break;
3168 }
3169
3170 /* Same as above: if a slot is not physical, exit the loop. */
3171 if (SlotIsLogical(slot))
3172 {
3173 ereport(elevel,
3175 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3176 name, "synchronized_standby_slots"),
3177 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3178 name),
3179 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3180 name, "synchronized_standby_slots"));
3181 break;
3182 }
3183
3184 SpinLockAcquire(&slot->mutex);
3185 restart_lsn = slot->data.restart_lsn;
3186 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3188 SpinLockRelease(&slot->mutex);
3189
3190 if (invalidated)
3191 {
3192 /* Specified physical slot has been invalidated */
3193 ereport(elevel,
3195 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3196 name, "synchronized_standby_slots"),
3197 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3198 name),
3199 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3200 name, "synchronized_standby_slots"));
3201 break;
3202 }
3203
3204 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3205 {
3206 /* Log a message if no active_pid for this physical slot */
3207 if (inactive)
3208 ereport(elevel,
3210 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3211 name, "synchronized_standby_slots"),
3212 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3213 name),
3214 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3215 name, "synchronized_standby_slots"));
3216
3217 /* Continue if the current slot hasn't caught up. */
3218 break;
3219 }
3220
3221 Assert(restart_lsn >= wait_for_lsn);
3222
3224 min_restart_lsn > restart_lsn)
3225 min_restart_lsn = restart_lsn;
3226
3228
3229 name += strlen(name) + 1;
3230 }
3231
3233
3234 /*
3235 * Return false if not all the standbys have caught up to the specified
3236 * WAL location.
3237 */
3239 return false;
3240
3241 /* The ss_oldest_flush_lsn must not retreat. */
3244
3246
3247 return true;
3248}
3249
3250/*
3251 * Wait for physical standbys to confirm receiving the given lsn.
3252 *
3253 * Used by logical decoding SQL functions. It waits for physical standbys
3254 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3255 */
3256void
3258{
3259 /*
3260 * Don't need to wait for the standby to catch up if the current acquired
3261 * slot is not a logical failover slot, or there is no value in
3262 * synchronized_standby_slots.
3263 */
3265 return;
3266
3268
3269 for (;;)
3270 {
3272
3274 {
3275 ConfigReloadPending = false;
3277 }
3278
3279 /* Exit if done waiting for every slot. */
3281 break;
3282
3283 /*
3284 * Wait for the slots in the synchronized_standby_slots to catch up,
3285 * but use a timeout (1s) so we can also check if the
3286 * synchronized_standby_slots has been changed.
3287 */
3290 }
3291
3293}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1729
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1803
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
#define NameStr(name)
Definition c.h:835
#define ngettext(s, p, n)
Definition c.h:1270
#define Assert(condition)
Definition c.h:943
#define PG_BINARY
Definition c.h:1386
uint64_t uint64
Definition c.h:625
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1323
int errcode_for_file_access(void)
Definition elog.c:898
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
#define LOG
Definition elog.h:32
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:42
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
int MakePGDirectory(const char *directoryName)
Definition fd.c:3963
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:123
ProcNumber MyProcNumber
Definition globals.c:92
bool IsUnderPostmaster
Definition globals.c:122
Oid MyDatabaseId
Definition globals.c:96
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6666
void * guc_malloc(int elevel, size_t size)
Definition guc.c:637
#define newval
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:511
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1588
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:423
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1885
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
@ B_STARTUP
Definition miscadmin.h:368
#define END_CRIT_SECTION()
Definition miscadmin.h:154
Oid GetUserId(void)
Definition miscinit.c:470
BackendType MyBackendType
Definition miscinit.c:65
bool has_rolreplication(Oid roleid)
Definition miscinit.c:689
void namestrcpy(Name name, const char *str)
Definition name.c:233
static char * errmsg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:177
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:182
const void size_t len
const void * data
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:263
#define snprintf
Definition port.h:261
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define GetPGProcByNumber(n)
Definition proc.h:504
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3942
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
Definition procarray.c:3446
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
#define ShmemRequestStruct(...)
Definition shmem.h:176
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:581
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1976
static void ReplicationSlotsShmemInit(void *arg)
Definition slot.c:221
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:115
char * synchronized_standby_slots
Definition slot.c:176
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3061
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:137
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2320
int idle_replication_slot_timeout_secs
Definition slot.c:170
void ReplicationSlotMarkDirty(void)
Definition slot.c:1180
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1884
void ReplicationSlotReserveWal(void)
Definition slot.c:1707
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1453
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:185
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:310
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1514
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:134
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1374
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2932
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1222
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2683
void ReplicationSlotPersist(void)
Definition slot.c:1197
bool CheckLogicalSlotExists(void)
Definition slot.c:1619
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1786
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2520
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:915
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3076
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2971
void ReplicationSlotSave(void)
Definition slot.c:1162
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2459
#define ReplicationSlotOnDiskV2Size
Definition slot.c:140
void CheckSlotPermissions(void)
Definition slot.c:1690
bool ReplicationSlotName(int index, Name name)
Definition slot.c:598
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:3005
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:946
void ReplicationSlotRelease(void)
Definition slot.c:769
int max_replication_slots
Definition slot.c:161
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:147
#define SLOT_VERSION
Definition slot.c:144
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3257
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3109
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1304
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:861
int max_repack_replication_slots
Definition slot.c:163
void ReplicationSlotInitialize(void)
Definition slot.c:240
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1055
void StartupReplicationSlots(void)
Definition slot.c:2398
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1868
void CheckSlotRequirements(bool repack)
Definition slot.c:1661
void ReplicationSlotDropAcquired(bool try_disable)
Definition slot.c:1031
#define SLOT_MAGIC
Definition slot.c:143
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2216
static void ReplicationSlotsShmemRequest(void *arg)
Definition slot.c:200
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:179
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:131
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2952
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:249
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:360
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:287
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1916
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:71
PROC_HDR * ProcGlobal
Definition proc.c:74
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1284
@ RECOVERY_CONFLICT_LOGICALSLOT
Definition standby.h:46
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
const char * cause_name
Definition slot.c:112
ReplicationSlotInvalidationCause cause
Definition slot.c:111
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:103
ConditionVariable wal_confirm_rcv_cv
Definition type.h:97
Definition c.h:830
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2870
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
const char * name
bool am_walsender
Definition walsender.c:135
bool log_replication_commands
Definition walsender.c:150
WalSndCtlData * WalSndCtl
Definition walsender.c:121
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6836
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3809
bool EnableHotStandby
Definition xlog.c:128
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6939
int wal_level
Definition xlog.c:138
int wal_segment_size
Definition xlog.c:150
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2687
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:10096
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

◆ ReplicationSlotOnDiskConstantSize

#define ReplicationSlotOnDiskConstantSize    offsetof(ReplicationSlotOnDisk, slotdata)

Definition at line 131 of file slot.c.

◆ ReplicationSlotOnDiskNotChecksummedSize

#define ReplicationSlotOnDiskNotChecksummedSize    offsetof(ReplicationSlotOnDisk, version)

Definition at line 134 of file slot.c.

◆ ReplicationSlotOnDiskV2Size

Definition at line 140 of file slot.c.

◆ SLOT_MAGIC

#define SLOT_MAGIC   0x1051CA1 /* format identifier */

Definition at line 143 of file slot.c.

◆ SLOT_VERSION

#define SLOT_VERSION   5 /* version for new files */

Definition at line 144 of file slot.c.

Typedef Documentation

◆ ReplicationSlotOnDisk

◆ SlotInvalidationCauseMap

Function Documentation

◆ assign_synchronized_standby_slots()

void assign_synchronized_standby_slots ( const char newval,
void extra 
)

Definition at line 3061 of file slot.c.

3062{
3063 /*
3064 * The standby slots may have changed, so we must recompute the oldest
3065 * LSN.
3066 */
3068
3070}

References InvalidXLogRecPtr, ss_oldest_flush_lsn, and synchronized_standby_slots_config.

◆ CanInvalidateIdleSlot()

◆ check_synchronized_standby_slots()

bool check_synchronized_standby_slots ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 3005 of file slot.c.

3006{
3007 char *rawname;
3008 char *ptr;
3009 List *elemlist;
3010 int size;
3011 bool ok;
3013
3014 if ((*newval)[0] == '\0')
3015 return true;
3016
3017 /* Need a modifiable copy of the GUC string */
3019
3020 /* Now verify if the specified slots exist and have correct type */
3022
3023 if (!ok || elemlist == NIL)
3024 {
3025 pfree(rawname);
3027 return ok;
3028 }
3029
3030 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3031 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3032 foreach_ptr(char, slot_name, elemlist)
3033 size += strlen(slot_name) + 1;
3034
3035 /* GUC extra value must be guc_malloc'd, not palloc'd */
3036 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3037 if (!config)
3038 return false;
3039
3040 /* Transform the data into SyncStandbySlotsConfigData */
3041 config->nslotnames = list_length(elemlist);
3042
3043 ptr = config->slot_names;
3044 foreach_ptr(char, slot_name, elemlist)
3045 {
3046 strcpy(ptr, slot_name);
3047 ptr += strlen(slot_name) + 1;
3048 }
3049
3050 *extra = config;
3051
3052 pfree(rawname);
3054 return true;
3055}

References fb(), foreach_ptr, guc_malloc(), list_free(), list_length(), LOG, newval, NIL, SyncStandbySlotsConfigData::nslotnames, pfree(), pstrdup(), SyncStandbySlotsConfigData::slot_names, and validate_sync_standby_slots().

◆ CheckLogicalSlotExists()

bool CheckLogicalSlotExists ( void  )

Definition at line 1619 of file slot.c.

1620{
1621 bool found = false;
1622
1624 return false;
1625
1628 {
1629 ReplicationSlot *s;
1630 bool invalidated;
1631
1633
1634 /* cannot change while ReplicationSlotCtlLock is held */
1635 if (!s->in_use)
1636 continue;
1637
1638 if (SlotIsPhysical(s))
1639 continue;
1640
1642 invalidated = s->data.invalidated != RS_INVAL_NONE;
1644
1645 if (invalidated)
1646 continue;
1647
1648 found = true;
1649 break;
1650 }
1652
1653 return found;
1654}

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SlotIsPhysical, SpinLockAcquire(), and SpinLockRelease().

Referenced by DisableLogicalDecoding(), and UpdateLogicalDecodingStatusEndOfRecovery().

◆ CheckPointReplicationSlots()

void CheckPointReplicationSlots ( bool  is_shutdown)

Definition at line 2320 of file slot.c.

2321{
2322 int i;
2323 bool last_saved_restart_lsn_updated = false;
2324
2325 elog(DEBUG1, "performing replication slot checkpoint");
2326
2327 /*
2328 * Prevent any slot from being created/dropped while we're active. As we
2329 * explicitly do *not* want to block iterating over replication_slots or
2330 * acquiring a slot we cannot take the control lock - but that's OK,
2331 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2332 * enough to guarantee that nobody can change the in_use bits on us.
2333 *
2334 * Additionally, acquiring the Allocation lock is necessary to serialize
2335 * the slot flush process with concurrent slot WAL reservation. This
2336 * ensures that the WAL position being reserved is either flushed to disk
2337 * or is beyond or equal to the redo pointer of the current checkpoint
2338 * (See ReplicationSlotReserveWal for details).
2339 */
2341
2343 {
2345 char path[MAXPGPATH];
2346
2347 if (!s->in_use)
2348 continue;
2349
2350 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2351 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2352
2353 /*
2354 * Slot's data is not flushed each time the confirmed_flush LSN is
2355 * updated as that could lead to frequent writes. However, we decide
2356 * to force a flush of all logical slot's data at the time of shutdown
2357 * if the confirmed_flush LSN is changed since we last flushed it to
2358 * disk. This helps in avoiding an unnecessary retreat of the
2359 * confirmed_flush LSN after restart.
2360 */
2361 if (is_shutdown && SlotIsLogical(s))
2362 {
2364
2365 if (s->data.invalidated == RS_INVAL_NONE &&
2367 {
2368 s->just_dirtied = true;
2369 s->dirty = true;
2370 }
2372 }
2373
2374 /*
2375 * Track if we're going to update slot's last_saved_restart_lsn. We
2376 * need this to know if we need to recompute the required LSN.
2377 */
2380
2381 SaveSlotToPath(s, path, LOG);
2382 }
2384
2385 /*
2386 * Recompute the required LSN if SaveSlotToPath() updated
2387 * last_saved_restart_lsn for any slot.
2388 */
2391}

References ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, DEBUG1, ReplicationSlot::dirty, elog, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LOG, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SaveSlotToPath(), SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and sprintf.

Referenced by CheckPointGuts().

◆ CheckSlotPermissions()

void CheckSlotPermissions ( void  )

Definition at line 1690 of file slot.c.

1691{
1693 ereport(ERROR,
1695 errmsg("permission denied to use replication slots"),
1696 errdetail("Only roles with the %s attribute may use replication slots.",
1697 "REPLICATION")));
1698}

References ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetUserId(), and has_rolreplication().

Referenced by copy_replication_slot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_drop_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), and pg_sync_replication_slots().

◆ CheckSlotRequirements()

void CheckSlotRequirements ( bool  repack)

Definition at line 1661 of file slot.c.

1662{
1663 /*
1664 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1665 * needs the same check.
1666 */
1667
1668 if (!repack && max_replication_slots == 0)
1669 ereport(ERROR,
1671 errmsg("replication slots can only be used if \"%s\" > 0",
1672 "max_replication_slots"));
1673
1675 ereport(ERROR,
1677 errmsg("REPACK can only be used if \"%s\" > 0",
1678 "max_repack_replication_slots"));
1679
1681 ereport(ERROR,
1683 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1684}

References ereport, errcode(), errmsg, ERROR, fb(), max_repack_replication_slots, max_replication_slots, wal_level, and WAL_LEVEL_REPLICA.

Referenced by CheckLogicalDecodingRequirements(), copy_replication_slot(), pg_create_physical_replication_slot(), and pg_drop_replication_slot().

◆ CreateSlotOnDisk()

static void CreateSlotOnDisk ( ReplicationSlot slot)
static

Definition at line 2459 of file slot.c.

2460{
2461 char tmppath[MAXPGPATH];
2462 char path[MAXPGPATH];
2463 struct stat st;
2464
2465 /*
2466 * No need to take out the io_in_progress_lock, nobody else can see this
2467 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2468 * takes out the lock, if we'd take the lock here, we'd deadlock.
2469 */
2470
2471 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2472 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2473
2474 /*
2475 * It's just barely possible that some previous effort to create or drop a
2476 * slot with this name left a temp directory lying around. If that seems
2477 * to be the case, try to remove it. If the rmtree() fails, we'll error
2478 * out at the MakePGDirectory() below, so we don't bother checking
2479 * success.
2480 */
2481 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2482 rmtree(tmppath, true);
2483
2484 /* Create and fsync the temporary slot directory. */
2485 if (MakePGDirectory(tmppath) < 0)
2486 ereport(ERROR,
2488 errmsg("could not create directory \"%s\": %m",
2489 tmppath)));
2490 fsync_fname(tmppath, true);
2491
2492 /* Write the actual state file. */
2493 slot->dirty = true; /* signal that we really need to write */
2495
2496 /* Rename the directory into place. */
2497 if (rename(tmppath, path) != 0)
2498 ereport(ERROR,
2500 errmsg("could not rename file \"%s\" to \"%s\": %m",
2501 tmppath, path)));
2502
2503 /*
2504 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2505 * would persist after an OS crash or not - so, force a restart. The
2506 * restart would try to fsync this again till it works.
2507 */
2509
2510 fsync_fname(path, true);
2512
2514}

References ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, ERROR, fb(), fsync_fname(), MakePGDirectory(), MAXPGPATH, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, rmtree(), S_ISDIR, SaveSlotToPath(), sprintf, stat::st_mode, START_CRIT_SECTION, and stat.

Referenced by ReplicationSlotCreate().

◆ DetermineSlotInvalidationCause()

static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
TimestampTz inactive_since,
TimestampTz  now 
)
static

Definition at line 1884 of file slot.c.

1888{
1890
1892 {
1893 XLogRecPtr restart_lsn = s->data.restart_lsn;
1894
1895 if (XLogRecPtrIsValid(restart_lsn) &&
1896 restart_lsn < oldestLSN)
1897 return RS_INVAL_WAL_REMOVED;
1898 }
1899
1901 {
1902 /* invalid DB oid signals a shared relation */
1903 if (SlotIsLogical(s) &&
1904 (dboid == InvalidOid || dboid == s->data.database))
1905 {
1906 TransactionId effective_xmin = s->effective_xmin;
1908
1909 if (TransactionIdIsValid(effective_xmin) &&
1910 TransactionIdPrecedesOrEquals(effective_xmin,
1911 snapshotConflictHorizon))
1912 return RS_INVAL_HORIZON;
1915 snapshotConflictHorizon))
1916 return RS_INVAL_HORIZON;
1917 }
1918 }
1919
1921 {
1922 if (SlotIsLogical(s))
1923 return RS_INVAL_WAL_LEVEL;
1924 }
1925
1927 {
1928 Assert(now > 0);
1929
1930 if (CanInvalidateIdleSlot(s))
1931 {
1932 /*
1933 * Simulate the invalidation due to idle_timeout to test the
1934 * timeout behavior promptly, without waiting for it to trigger
1935 * naturally.
1936 */
1937#ifdef USE_INJECTION_POINTS
1938 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1939 {
1940 *inactive_since = 0; /* since the beginning of time */
1941 return RS_INVAL_IDLE_TIMEOUT;
1942 }
1943#endif
1944
1945 /*
1946 * Check if the slot needs to be invalidated due to
1947 * idle_replication_slot_timeout GUC.
1948 */
1951 {
1952 *inactive_since = s->inactive_since;
1953 return RS_INVAL_IDLE_TIMEOUT;
1954 }
1955 }
1956 }
1957
1958 return RS_INVAL_NONE;
1959}

References Assert, CanInvalidateIdleSlot(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), idle_replication_slot_timeout_secs, ReplicationSlot::inactive_since, InvalidOid, IS_INJECTION_POINT_ATTACHED, now(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_INVAL_WAL_REMOVED, SlotIsLogical, TimestampDifferenceExceedsSeconds(), TransactionIdIsValid, TransactionIdPrecedesOrEquals(), and XLogRecPtrIsValid.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ GetSlotInvalidationCause()

ReplicationSlotInvalidationCause GetSlotInvalidationCause ( const char cause_name)

Definition at line 2932 of file slot.c.

2933{
2934 Assert(cause_name);
2935
2936 /* Search lookup table for the cause having this name */
2937 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2938 {
2939 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2941 }
2942
2943 Assert(false);
2944 return RS_INVAL_NONE; /* to keep compiler quiet */
2945}

References Assert, SlotInvalidationCauseMap::cause, fb(), i, RS_INVAL_MAX_CAUSES, RS_INVAL_NONE, and SlotInvalidationCauses.

Referenced by fetch_remote_slots().

◆ GetSlotInvalidationCauseName()

const char * GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause  cause)

Definition at line 2952 of file slot.c.

2953{
2954 /* Search lookup table for the name of this cause */
2955 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2956 {
2957 if (SlotInvalidationCauses[i].cause == cause)
2959 }
2960
2961 Assert(false);
2962 return "none"; /* to keep compiler quiet */
2963}

References Assert, SlotInvalidationCauseMap::cause_name, i, RS_INVAL_MAX_CAUSES, and SlotInvalidationCauses.

Referenced by pg_get_replication_slots(), and ReplicationSlotAcquire().

◆ InvalidateObsoleteReplicationSlots()

bool InvalidateObsoleteReplicationSlots ( uint32  possible_causes,
XLogSegNo  oldestSegno,
Oid  dboid,
TransactionId  snapshotConflictHorizon 
)

Definition at line 2216 of file slot.c.

2219{
2221 bool invalidated = false;
2222 bool invalidated_logical = false;
2224
2225 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2228
2230 return invalidated;
2231
2233
2234restart:
2238 {
2240 bool released_lock = false;
2241
2242 if (!s->in_use)
2243 continue;
2244
2245 /* Prevent invalidation of logical slots during binary upgrade */
2247 {
2251
2252 continue;
2253 }
2254
2256 dboid, snapshotConflictHorizon,
2257 &released_lock))
2258 {
2260
2261 /* Remember we have invalidated a physical or logical slot */
2262 invalidated = true;
2263
2264 /*
2265 * Additionally, remember we have invalidated a logical slot as we
2266 * can request disabling logical decoding later.
2267 */
2268 if (SlotIsLogical(s))
2269 invalidated_logical = true;
2270 }
2271 else
2272 {
2273 /*
2274 * We need to check if the slot is invalidated here since
2275 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2276 * is already invalidated.
2277 */
2282 }
2283
2284 /* if the lock was released, start from scratch */
2285 if (released_lock)
2286 goto restart;
2287 }
2289
2290 /*
2291 * If any slots have been invalidated, recalculate the resource limits.
2292 */
2293 if (invalidated)
2294 {
2297 }
2298
2299 /*
2300 * Request the checkpointer to disable logical decoding if no valid
2301 * logical slots remain. If called by the checkpointer during a
2302 * checkpoint, only the request is initiated; actual deactivation is
2303 * deferred until after the checkpoint completes.
2304 */
2307
2308 return invalidated;
2309}

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidatePossiblyObsoleteSlot(), IsBinaryUpgrade, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RequestDisableLogicalDecoding(), RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by CreateCheckPoint(), CreateRestartPoint(), ResolveRecoveryConflictWithSnapshot(), and xlog_redo().

◆ InvalidatePossiblyObsoleteSlot()

static bool InvalidatePossiblyObsoleteSlot ( uint32  possible_causes,
ReplicationSlot s,
XLogRecPtr  oldestLSN,
Oid  dboid,
TransactionId  snapshotConflictHorizon,
bool released_lock_out 
)
static

Definition at line 1976 of file slot.c.

1981{
1982 int last_signaled_pid = 0;
1983 bool released_lock = false;
1984 bool invalidated = false;
1985 TimestampTz inactive_since = 0;
1986
1987 for (;;)
1988 {
1989 XLogRecPtr restart_lsn;
1990 NameData slotname;
1991 ProcNumber active_proc;
1992 int active_pid = 0;
1994 TimestampTz now = 0;
1995 long slot_idle_secs = 0;
1996
1998
1999 if (!s->in_use)
2000 {
2001 if (released_lock)
2003 break;
2004 }
2005
2007 {
2008 /*
2009 * Assign the current time here to avoid system call overhead
2010 * while holding the spinlock in subsequent code.
2011 */
2013 }
2014
2015 /*
2016 * Check if the slot needs to be invalidated. If it needs to be
2017 * invalidated, and is not currently acquired, acquire it and mark it
2018 * as having been invalidated. We do this with the spinlock held to
2019 * avoid race conditions -- for example the restart_lsn could move
2020 * forward, or the slot could be dropped.
2021 */
2023
2024 restart_lsn = s->data.restart_lsn;
2025
2026 /* we do nothing if the slot is already invalid */
2027 if (s->data.invalidated == RS_INVAL_NONE)
2029 s, oldestLSN,
2030 dboid,
2031 snapshotConflictHorizon,
2032 &inactive_since,
2033 now);
2034
2035 /* if there's no invalidation, we're done */
2037 {
2039 if (released_lock)
2041 break;
2042 }
2043
2044 slotname = s->data.name;
2045 active_proc = s->active_proc;
2046
2047 /*
2048 * If the slot can be acquired, do so and mark it invalidated
2049 * immediately. Otherwise we'll signal the owning process, below, and
2050 * retry.
2051 *
2052 * Note: Unlike other slot attributes, slot's inactive_since can't be
2053 * changed until the acquired slot is released or the owning process
2054 * is terminated. So, the inactive slot can only be invalidated
2055 * immediately without being terminated.
2056 */
2057 if (active_proc == INVALID_PROC_NUMBER)
2058 {
2062
2063 /*
2064 * XXX: We should consider not overwriting restart_lsn and instead
2065 * just rely on .invalidated.
2066 */
2068 {
2071 }
2072
2073 /* Let caller know */
2074 invalidated = true;
2075 }
2076 else
2077 {
2078 active_pid = GetPGProcByNumber(active_proc)->pid;
2079 Assert(active_pid != 0);
2080 }
2081
2083
2084 /*
2085 * Calculate the idle time duration of the slot if slot is marked
2086 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2087 */
2089 {
2090 int slot_idle_usecs;
2091
2092 TimestampDifference(inactive_since, now, &slot_idle_secs,
2094 }
2095
2096 if (active_proc != INVALID_PROC_NUMBER)
2097 {
2098 /*
2099 * Prepare the sleep on the slot's condition variable before
2100 * releasing the lock, to close a possible race condition if the
2101 * slot is released before the sleep below.
2102 */
2104
2106 released_lock = true;
2107
2108 /*
2109 * Signal to terminate the process that owns the slot, if we
2110 * haven't already signalled it. (Avoidance of repeated
2111 * signalling is the only reason for there to be a loop in this
2112 * routine; otherwise we could rely on caller's restart loop.)
2113 *
2114 * There is the race condition that other process may own the slot
2115 * after its current owner process is terminated and before this
2116 * process owns it. To handle that, we signal only if the PID of
2117 * the owning process has changed from the previous time. (This
2118 * logic assumes that the same PID is not reused very quickly.)
2119 */
2121 {
2123 slotname, restart_lsn,
2124 oldestLSN, snapshotConflictHorizon,
2126
2127 if (MyBackendType == B_STARTUP)
2129 active_pid,
2131 else
2132 (void) kill(active_pid, SIGTERM);
2133
2135 }
2136
2137 /* Wait until the slot is released. */
2140
2141 /*
2142 * Re-acquire lock and start over; we expect to invalidate the
2143 * slot next time (unless another process acquires the slot in the
2144 * meantime).
2145 *
2146 * Note: It is possible for a slot to advance its restart_lsn or
2147 * xmin values sufficiently between when we release the mutex and
2148 * when we recheck, moving from a conflicting state to a non
2149 * conflicting state. This is intentional and safe: if the slot
2150 * has caught up while we're busy here, the resources we were
2151 * concerned about (WAL segments or tuples) have not yet been
2152 * removed, and there's no reason to invalidate the slot.
2153 */
2155 continue;
2156 }
2157 else
2158 {
2159 /*
2160 * We hold the slot now and have already invalidated it; flush it
2161 * to ensure that state persists.
2162 *
2163 * Don't want to hold ReplicationSlotControlLock across file
2164 * system operations, so release it now but be sure to tell caller
2165 * to restart from scratch.
2166 */
2168 released_lock = true;
2169
2170 /* Make sure the invalidated state persists across server restart */
2174
2176 slotname, restart_lsn,
2177 oldestLSN, snapshotConflictHorizon,
2179
2180 /* done with this slot for now */
2181 break;
2182 }
2183 }
2184
2186
2188 return invalidated;
2189}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, B_STARTUP, ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DetermineSlotInvalidationCause(), fb(), GetCurrentTimestamp(), GetPGProcByNumber, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, kill, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockHeldByMeInMode(), LWLockRelease(), ReplicationSlot::mutex, MyBackendType, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, now(), RECOVERY_CONFLICT_LOGICALSLOT, ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReportSlotInvalidation(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_REMOVED, SignalRecoveryConflict(), SpinLockAcquire(), SpinLockRelease(), and TimestampDifference().

Referenced by InvalidateObsoleteReplicationSlots().

◆ IsSlotForConflictCheck()

static bool IsSlotForConflictCheck ( const char name)
static

Definition at line 360 of file slot.c.

361{
362 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363}

References CONFLICT_DETECTION_SLOT, fb(), and name.

Referenced by ReplicationSlotAcquire(), and ReplicationSlotValidateNameInternal().

◆ ReplicationSlotAcquire()

void ReplicationSlotAcquire ( const char name,
bool  nowait,
bool  error_if_invalid 
)

Definition at line 629 of file slot.c.

630{
632 ProcNumber active_proc;
633 int active_pid;
634
635 Assert(name != NULL);
636
637retry:
639
641
642 /* Check if the slot exists with the given name. */
644 if (s == NULL || !s->in_use)
645 {
647
650 errmsg("replication slot \"%s\" does not exist",
651 name)));
652 }
653
654 /*
655 * Do not allow users to acquire the reserved slot. This scenario may
656 * occur if the launcher that owns the slot has terminated unexpectedly
657 * due to an error, and a backend process attempts to reuse the slot.
658 */
662 errmsg("cannot acquire replication slot \"%s\"", name),
663 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664
665 /*
666 * This is the slot we want; check if it's active under some other
667 * process. In single user mode, we don't need this check.
668 */
670 {
671 /*
672 * Get ready to sleep on the slot in case it is active. (We may end
673 * up not sleeping, but we don't want to do this while holding the
674 * spinlock.)
675 */
676 if (!nowait)
678
679 /*
680 * It is important to reset the inactive_since under spinlock here to
681 * avoid race conditions with slot invalidation. See comments related
682 * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 */
687 active_proc = s->active_proc;
690 }
691 else
692 {
693 s->active_proc = active_proc = MyProcNumber;
695 }
696 active_pid = GetPGProcByNumber(active_proc)->pid;
698
699 /*
700 * If we found the slot but it's already active in another process, we
701 * wait until the owning process signals us that it's been released, or
702 * error out.
703 */
704 if (active_proc != MyProcNumber)
705 {
706 if (!nowait)
707 {
708 /* Wait here until we get signaled, and then restart */
712 goto retry;
713 }
714
717 errmsg("replication slot \"%s\" is active for PID %d",
718 NameStr(s->data.name), active_pid)));
719 }
720 else if (!nowait)
721 ConditionVariableCancelSleep(); /* no sleep needed after all */
722
723 /* We made this slot active, so it's ours now. */
725
726 /*
727 * We need to check for invalidation after making the slot ours to avoid
728 * the possible race condition with the checkpointer that can otherwise
729 * invalidate the slot immediately after the check.
730 */
734 errmsg("can no longer access replication slot \"%s\"",
735 NameStr(s->data.name)),
736 errdetail("This replication slot has been invalidated due to \"%s\".",
738
739 /* Let everybody know we've modified this slot */
741
742 /*
743 * The call to pgstat_acquire_replslot() protects against stats for a
744 * different slot, from before a restart or such, being present during
745 * pgstat_report_replslot().
746 */
747 if (SlotIsLogical(s))
749
750
751 if (am_walsender)
752 {
755 ? errmsg("acquired logical replication slot \"%s\"",
756 NameStr(s->data.name))
757 : errmsg("acquired physical replication slot \"%s\"",
758 NameStr(s->data.name)));
759 }
760}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), ReplicationSlot::data, DEBUG1, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetPGProcByNumber, GetSlotInvalidationCauseName(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, IsLogicalLauncher(), IsSlotForConflictCheck(), IsUnderPostmaster, LOG, log_replication_commands, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, pgstat_acquire_replslot(), ReplicationSlotSetInactiveSince(), RS_INVAL_NONE, SearchNamedReplicationSlot(), SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by acquire_conflict_slot_if_exists(), binary_upgrade_check_logical_slot_pending_wal(), drop_local_obsolete_slots(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), ReplicationSlotAlter(), ReplicationSlotDrop(), StartLogicalReplication(), StartReplication(), and synchronize_one_slot().

◆ ReplicationSlotAlter()

void ReplicationSlotAlter ( const char name,
const bool failover,
const bool two_phase 
)

Definition at line 946 of file slot.c.

948{
949 bool update_slot = false;
950
953
954 ReplicationSlotAcquire(name, false, true);
955
959 errmsg("cannot use %s with a physical replication slot",
960 "ALTER_REPLICATION_SLOT"));
961
962 if (RecoveryInProgress())
963 {
964 /*
965 * Do not allow users to alter the slots which are currently being
966 * synced from the primary to the standby.
967 */
971 errmsg("cannot alter replication slot \"%s\"", name),
972 errdetail("This replication slot is being synchronized from the primary server."));
973
974 /*
975 * Do not allow users to enable failover on the standby as we do not
976 * support sync to the cascading standby.
977 */
978 if (failover && *failover)
981 errmsg("cannot enable failover for a replication slot"
982 " on the standby"));
983 }
984
985 if (failover)
986 {
987 /*
988 * Do not allow users to enable failover for temporary slots as we do
989 * not support syncing temporary slots to the standby.
990 */
994 errmsg("cannot enable failover for a temporary replication slot"));
995
997 {
1001
1002 update_slot = true;
1003 }
1004 }
1005
1007 {
1011
1012 update_slot = true;
1013 }
1014
1015 if (update_slot)
1016 {
1019 }
1020
1022}

References Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, fb(), ReplicationSlot::mutex, MyReplicationSlot, name, ReplicationSlotPersistentData::persistency, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), RS_TEMPORARY, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), ReplicationSlotPersistentData::synced, two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by AlterReplicationSlot().

◆ ReplicationSlotCleanup()

void ReplicationSlotCleanup ( bool  synced_only)

Definition at line 861 of file slot.c.

862{
863 int i;
865 bool dropped_logical = false;
866
868
869restart:
873 {
875
876 if (!s->in_use)
877 continue;
878
880
883
884 if ((s->active_proc == MyProcNumber &&
885 (!synced_only || s->data.synced)))
886 {
889 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890
891 if (SlotIsLogical(s))
892 dropped_logical = true;
893
895
897 goto restart;
898 }
899 else
901 }
902
904
907}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and ReplicationSlotPersistentData::synced.

Referenced by PostgresMain(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), SyncReplicationSlots(), and WalSndErrorCleanup().

◆ ReplicationSlotCreate()

void ReplicationSlotCreate ( const char name,
bool  db_specific,
ReplicationSlotPersistency  persistency,
bool  two_phase,
bool  repack,
bool  failover,
bool  synced 
)

Definition at line 378 of file slot.c.

381{
382 ReplicationSlot *slot = NULL;
383 int startpoint,
384 endpoint;
385
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
431
432 /*
433 * Check for name collision (across the whole array), and identify an
434 * allocatable slot (in the array slice specific to our current use case:
435 * either general, or REPACK only). We need to hold
436 * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 * can change the in_use flags while we're looking at them.
438 */
440 startpoint = !repack ? 0 : max_replication_slots;
443 {
445
446 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
449 errmsg("replication slot \"%s\" already exists", name)));
450
451 if (i >= startpoint && i < endpoint &&
452 !s->in_use && slot == NULL)
453 slot = s;
454 }
456
457 /* If all slots are in use, we're out of luck. */
458 if (slot == NULL)
461 errmsg("all replication slots are in use"),
462 errhint("Free one or increase \"%s\".",
463 repack ? "max_repack_replication_slots" : "max_replication_slots")));
464
465 /*
466 * Since this slot is not in use, nobody should be looking at any part of
467 * it other than the in_use field unless they're trying to allocate it.
468 * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 * be doing that. So it's safe to initialize the slot.
470 */
471 Assert(!slot->in_use);
473
474 /* first initialize persistent data */
475 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 namestrcpy(&slot->data.name, name);
478 slot->data.persistency = persistency;
479 slot->data.two_phase = two_phase;
481 slot->data.failover = failover;
482 slot->data.synced = synced;
483
484 /* and then data only present in shared memory */
485 slot->just_dirtied = false;
486 slot->dirty = false;
495 slot->inactive_since = 0;
497
498 /*
499 * Create the slot on disk. We haven't actually marked the slot allocated
500 * yet, so no special cleanup is required if this errors out.
501 */
502 CreateSlotOnDisk(slot);
503
504 /*
505 * We need to briefly prevent any other backend from iterating over the
506 * slots while we flip the in_use flag. We also need to set the active
507 * flag while holding the ControlLock as otherwise a concurrent
508 * ReplicationSlotAcquire() could acquire the slot as well.
509 */
511
512 slot->in_use = true;
513
514 /* We can now mark the slot active, and that makes it our slot. */
515 SpinLockAcquire(&slot->mutex);
518 SpinLockRelease(&slot->mutex);
519 MyReplicationSlot = slot;
520
522
523 /*
524 * Create statistics entry for the new logical slot. We don't collect any
525 * stats for physical slots, so no need to create an entry for the same.
526 * See ReplicationSlotDropPtr for why we need to do this before releasing
527 * ReplicationSlotAllocationLock.
528 */
529 if (SlotIsLogical(slot))
531
532 /*
533 * Now that the slot has been marked as in_use and active, it's safe to
534 * let somebody else try to allocate a slot.
535 */
537
538 /* Let everybody know we've modified this slot */
540}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, Assert, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, ConditionVariableBroadcast(), CreateSlotOnDisk(), ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::dirty, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg, ERROR, failover, ReplicationSlotPersistentData::failover, fb(), i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsBinaryUpgrade, IsLogicalLauncher(), IsSyncingReplicationSlots(), ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyDatabaseId, MyProcNumber, MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, pgstat_create_replslot(), RecoveryInProgress(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotValidateName(), RS_TEMPORARY, SlotIsLogical, ReplicationSlot::slotsync_skip_reason, SpinLockAcquire(), SpinLockRelease(), SS_SKIP_NONE, ReplicationSlotPersistentData::synced, two_phase, ReplicationSlotPersistentData::two_phase, and ReplicationSlotPersistentData::two_phase_at.

Referenced by create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateReplicationSlot(), repack_setup_logical_decoding(), and synchronize_one_slot().

◆ ReplicationSlotDrop()

void ReplicationSlotDrop ( const char name,
bool  nowait 
)

Definition at line 915 of file slot.c.

916{
917 ReplicationSlotAcquire(name, nowait, false);
918
919 /*
920 * Do not allow users to drop the slots which are currently being synced
921 * from the primary to the standby.
922 */
926 errmsg("cannot drop replication slot \"%s\"", name),
927 errdetail("This replication slot is being synchronized from the primary server."));
928
930}

References ReplicationSlot::data, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), MyReplicationSlot, name, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by DropReplicationSlot(), and pg_drop_replication_slot().

◆ ReplicationSlotDropAcquired()

void ReplicationSlotDropAcquired ( bool  try_disable)

Definition at line 1031 of file slot.c.

1032{
1033 ReplicationSlot *slot;
1034
1036 slot = MyReplicationSlot;
1037
1038 /* Can only disable logical decoding if slot is logical */
1039 Assert(!try_disable || SlotIsLogical(slot));
1040
1041 /* slot isn't acquired anymore */
1043
1045
1046 if (try_disable)
1048}

References Assert, fb(), MyReplicationSlot, ReplicationSlotDropPtr(), RequestDisableLogicalDecoding(), and SlotIsLogical.

Referenced by ApplyLauncherMain(), drop_local_obsolete_slots(), repack_cleanup_logical_decoding(), ReplicationSlotDrop(), ReplicationSlotRelease(), and ReplicationSlotsDropDBSlots().

◆ ReplicationSlotDropPtr()

static void ReplicationSlotDropPtr ( ReplicationSlot slot)
static

Definition at line 1055 of file slot.c.

1056{
1057 char path[MAXPGPATH];
1058 char tmppath[MAXPGPATH];
1059
1060 /*
1061 * If some other backend ran this code concurrently with us, we might try
1062 * to delete a slot with a certain name while someone else was trying to
1063 * create a slot with the same name.
1064 */
1066
1067 /* Generate pathnames. */
1068 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1069 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1070
1071 /*
1072 * Rename the slot directory on disk, so that we'll no longer recognize
1073 * this as a valid slot. Note that if this fails, we've got to mark the
1074 * slot inactive before bailing out. If we're dropping an ephemeral or a
1075 * temporary slot, we better never fail hard as the caller won't expect
1076 * the slot to survive and this might get called during error handling.
1077 */
1078 if (rename(path, tmppath) == 0)
1079 {
1080 /*
1081 * We need to fsync() the directory we just renamed and its parent to
1082 * make sure that our changes are on disk in a crash-safe fashion. If
1083 * fsync() fails, we can't be sure whether the changes are on disk or
1084 * not. For now, we handle that by panicking;
1085 * StartupReplicationSlots() will try to straighten it out after
1086 * restart.
1087 */
1089 fsync_fname(tmppath, true);
1092 }
1093 else
1094 {
1095 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1096
1097 SpinLockAcquire(&slot->mutex);
1099 SpinLockRelease(&slot->mutex);
1100
1101 /* wake up anyone waiting on this slot */
1103
1106 errmsg("could not rename file \"%s\" to \"%s\": %m",
1107 path, tmppath)));
1108 }
1109
1110 /*
1111 * The slot is definitely gone. Lock out concurrent scans of the array
1112 * long enough to kill it. It's OK to clear the active PID here without
1113 * grabbing the mutex because nobody else can be scanning the array here,
1114 * and nobody can be attached to this slot and thus access it without
1115 * scanning the array.
1116 *
1117 * Also wake up processes waiting for it.
1118 */
1121 slot->in_use = false;
1124
1125 /*
1126 * Slot is dead and doesn't prevent resource removal anymore, recompute
1127 * limits.
1128 */
1131
1132 /*
1133 * If removing the directory fails, the worst thing that will happen is
1134 * that the user won't be able to create a new slot with the same name
1135 * until the next server restart. We warn about it, but that's all.
1136 */
1137 if (!rmtree(tmppath, true))
1139 (errmsg("could not remove directory \"%s\"", tmppath)));
1140
1141 /*
1142 * Drop the statistics entry for the replication slot. Do this while
1143 * holding ReplicationSlotAllocationLock so that we don't drop a
1144 * statistics entry for another slot with the same name just created in
1145 * another session.
1146 */
1147 if (SlotIsLogical(slot))
1149
1150 /*
1151 * We release this at the very end, so that nobody starts trying to create
1152 * a slot while we're still cleaning up the detritus of the old one.
1153 */
1155}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, ConditionVariableBroadcast(), ReplicationSlot::data, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, ERROR, fb(), fsync_fname(), ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotPersistentData::persistency, PG_REPLSLOT_DIR, pgstat_drop_replslot(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), rmtree(), RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), sprintf, START_CRIT_SECTION, and WARNING.

Referenced by ReplicationSlotCleanup(), and ReplicationSlotDropAcquired().

◆ ReplicationSlotIndex()

◆ ReplicationSlotInitialize()

void ReplicationSlotInitialize ( void  )

Definition at line 240 of file slot.c.

References before_shmem_exit(), and ReplicationSlotShmemExit().

Referenced by BaseInit().

◆ ReplicationSlotMarkDirty()

◆ ReplicationSlotName()

bool ReplicationSlotName ( int  index,
Name  name 
)

Definition at line 598 of file slot.c.

599{
600 ReplicationSlot *slot;
601 bool found;
602
604
605 /*
606 * Ensure that the slot cannot be dropped while we copy the name. Don't
607 * need the spinlock as the name of an existing slot cannot change.
608 */
610 found = slot->in_use;
611 if (slot->in_use)
614
615 return found;
616}

References ReplicationSlot::data, fb(), ReplicationSlot::in_use, LW_SHARED, LWLockAcquire(), LWLockRelease(), name, ReplicationSlotPersistentData::name, NameStr, namestrcpy(), ReplicationSlotCtlData::replication_slots, and ReplicationSlotCtl.

Referenced by pgstat_replslot_to_serialized_name_cb().

◆ ReplicationSlotPersist()

◆ ReplicationSlotRelease()

void ReplicationSlotRelease ( void  )

Definition at line 769 of file slot.c.

770{
772 char *slotname = NULL; /* keep compiler quiet */
773 bool is_logical;
774 TimestampTz now = 0;
775
776 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777
779
780 if (am_walsender)
781 slotname = pstrdup(NameStr(slot->data.name));
782
783 if (slot->data.persistency == RS_EPHEMERAL)
784 {
785 /*
786 * If slot is ephemeral, we drop it upon release, and request logical
787 * decoding be disabled.
788 */
790 }
791 else
792 {
793 /*
794 * If slot needed to temporarily restrain both data and catalog xmin
795 * to create the catalog snapshot, remove that temporary constraint.
796 * Snapshots can only be exported while the initial snapshot is still
797 * acquired.
798 */
799 if (!TransactionIdIsValid(slot->data.xmin) &&
801 {
802 SpinLockAcquire(&slot->mutex);
804 SpinLockRelease(&slot->mutex);
806 }
807
808 /*
809 * Set the time since the slot has become inactive. We get the current
810 * time beforehand to avoid system call while holding the spinlock.
811 */
813
814 if (slot->data.persistency == RS_PERSISTENT)
815 {
816 /*
817 * Mark persistent slot inactive. We're not freeing it, just
818 * disconnecting, but wake up others that may be waiting for it.
819 */
820 SpinLockAcquire(&slot->mutex);
823 SpinLockRelease(&slot->mutex);
825 }
826 else
828
830 }
831
832 /* might not have been set when we've been a plain slot */
837
838 if (am_walsender)
839 {
842 ? errmsg("released logical replication slot \"%s\"",
843 slotname)
844 : errmsg("released physical replication slot \"%s\"",
845 slotname));
846
847 pfree(slotname);
848 }
849}

References ReplicationSlot::active_cv, ReplicationSlot::active_proc, am_walsender, Assert, ConditionVariableBroadcast(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, ereport, errmsg, fb(), GetCurrentTimestamp(), INVALID_PROC_NUMBER, InvalidTransactionId, LOG, log_replication_commands, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, now(), ReplicationSlotPersistentData::persistency, pfree(), PGPROC::pgxactoff, ProcGlobal, pstrdup(), ReplicationSlotDropAcquired(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotSetInactiveSince(), RS_EPHEMERAL, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), PGPROC::statusFlags, PROC_HDR::statusFlags, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by binary_upgrade_check_logical_slot_pending_wal(), binary_upgrade_create_conflict_detection_slot(), copy_replication_slot(), CreateReplicationSlot(), InvalidatePossiblyObsoleteSlot(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_replication_slot_advance(), PostgresMain(), ReplicationSlotAlter(), ReplicationSlotShmemExit(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), synchronize_one_slot(), and WalSndErrorCleanup().

◆ ReplicationSlotReserveWal()

void ReplicationSlotReserveWal ( void  )

Definition at line 1707 of file slot.c.

1708{
1710 XLogSegNo segno;
1711 XLogRecPtr restart_lsn;
1712
1713 Assert(slot != NULL);
1716
1717 /*
1718 * The replication slot mechanism is used to prevent the removal of
1719 * required WAL.
1720 *
1721 * Acquire an exclusive lock to prevent the checkpoint process from
1722 * concurrently computing the minimum slot LSN (see
1723 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1724 * replication cannot be removed during a checkpoint.
1725 *
1726 * The mechanism is reliable because if WAL reservation occurs first, the
1727 * checkpoint must wait for the restart_lsn update before determining the
1728 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1729 * first, subsequent WAL reservations will select positions at or beyond
1730 * the redo pointer of that checkpoint.
1731 */
1733
1734 /*
1735 * For logical slots log a standby snapshot and start logical decoding at
1736 * exactly that position. That allows the slot to start up more quickly.
1737 * But on a standby we cannot do WAL writes, so just use the replay
1738 * pointer; effectively, an attempt to create a logical slot on standby
1739 * will cause it to wait for an xl_running_xact record to be logged
1740 * independently on the primary, so that a snapshot can be built using the
1741 * record.
1742 *
1743 * None of this is needed (or indeed helpful) for physical slots as
1744 * they'll start replay at the last logged checkpoint anyway. Instead,
1745 * return the location of the last redo LSN, where a base backup has to
1746 * start replay at.
1747 */
1748 if (SlotIsPhysical(slot))
1749 restart_lsn = GetRedoRecPtr();
1750 else if (RecoveryInProgress())
1751 restart_lsn = GetXLogReplayRecPtr(NULL);
1752 else
1753 restart_lsn = GetXLogInsertRecPtr();
1754
1755 SpinLockAcquire(&slot->mutex);
1756 slot->data.restart_lsn = restart_lsn;
1757 SpinLockRelease(&slot->mutex);
1758
1759 /* prevent WAL removal as fast as possible */
1761
1762 /* Checkpoint shouldn't remove the required WAL. */
1764 if (XLogGetLastRemovedSegno() >= segno)
1765 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1766 NameStr(slot->data.name));
1767
1769
1770 if (!RecoveryInProgress() && SlotIsLogical(slot))
1771 {
1773
1774 /* make sure we have enough information to start */
1776
1777 /* and make sure it's fsynced to disk */
1779 }
1780}

References Assert, ReplicationSlot::data, elog, ERROR, fb(), GetRedoRecPtr(), GetXLogInsertRecPtr(), GetXLogReplayRecPtr(), ReplicationSlot::last_saved_restart_lsn, LogStandbySnapshot(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SlotIsLogical, SlotIsPhysical, SpinLockAcquire(), SpinLockRelease(), wal_segment_size, XLByteToSeg, XLogFlush(), XLogGetLastRemovedSegno(), and XLogRecPtrIsValid.

Referenced by create_physical_replication_slot(), CreateInitDecodingContext(), and CreateReplicationSlot().

◆ ReplicationSlotSave()

◆ ReplicationSlotsComputeLogicalRestartLSN()

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN ( void  )

Definition at line 1374 of file slot.c.

1375{
1377 int i;
1378
1380 return InvalidXLogRecPtr;
1381
1383
1385 {
1386 ReplicationSlot *s;
1387 XLogRecPtr restart_lsn;
1388 XLogRecPtr last_saved_restart_lsn;
1389 bool invalidated;
1390 ReplicationSlotPersistency persistency;
1391
1393
1394 /* cannot change while ReplicationSlotCtlLock is held */
1395 if (!s->in_use)
1396 continue;
1397
1398 /* we're only interested in logical slots */
1399 if (!SlotIsLogical(s))
1400 continue;
1401
1402 /* read once, it's ok if it increases while we're checking */
1404 persistency = s->data.persistency;
1405 restart_lsn = s->data.restart_lsn;
1406 invalidated = s->data.invalidated != RS_INVAL_NONE;
1407 last_saved_restart_lsn = s->last_saved_restart_lsn;
1409
1410 /* invalidated slots need not apply */
1411 if (invalidated)
1412 continue;
1413
1414 /*
1415 * For persistent slot use last_saved_restart_lsn to compute the
1416 * oldest LSN for removal of WAL segments. The segments between
1417 * last_saved_restart_lsn and restart_lsn might be needed by a
1418 * persistent slot in the case of database crash. Non-persistent
1419 * slots can't survive the database crash, so we don't care about
1420 * last_saved_restart_lsn for them.
1421 */
1422 if (persistency == RS_PERSISTENT)
1423 {
1424 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1425 restart_lsn > last_saved_restart_lsn)
1426 {
1427 restart_lsn = last_saved_restart_lsn;
1428 }
1429 }
1430
1431 if (!XLogRecPtrIsValid(restart_lsn))
1432 continue;
1433
1434 if (!XLogRecPtrIsValid(result) ||
1435 restart_lsn < result)
1436 result = restart_lsn;
1437 }
1438
1440
1441 return result;
1442}

References ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, result, RS_INVAL_NONE, RS_PERSISTENT, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by CheckPointLogicalRewriteHeap(), and CheckPointSnapBuild().

◆ ReplicationSlotsComputeRequiredLSN()

void ReplicationSlotsComputeRequiredLSN ( void  )

Definition at line 1304 of file slot.c.

1305{
1306 int i;
1308
1310
1313 {
1315 XLogRecPtr restart_lsn;
1316 XLogRecPtr last_saved_restart_lsn;
1317 bool invalidated;
1318 ReplicationSlotPersistency persistency;
1319
1320 if (!s->in_use)
1321 continue;
1322
1324 persistency = s->data.persistency;
1325 restart_lsn = s->data.restart_lsn;
1326 invalidated = s->data.invalidated != RS_INVAL_NONE;
1327 last_saved_restart_lsn = s->last_saved_restart_lsn;
1329
1330 /* invalidated slots need not apply */
1331 if (invalidated)
1332 continue;
1333
1334 /*
1335 * For persistent slot use last_saved_restart_lsn to compute the
1336 * oldest LSN for removal of WAL segments. The segments between
1337 * last_saved_restart_lsn and restart_lsn might be needed by a
1338 * persistent slot in the case of database crash. Non-persistent
1339 * slots can't survive the database crash, so we don't care about
1340 * last_saved_restart_lsn for them.
1341 */
1342 if (persistency == RS_PERSISTENT)
1343 {
1344 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1345 restart_lsn > last_saved_restart_lsn)
1346 {
1347 restart_lsn = last_saved_restart_lsn;
1348 }
1349 }
1350
1351 if (XLogRecPtrIsValid(restart_lsn) &&
1353 restart_lsn < min_required))
1354 min_required = restart_lsn;
1355 }
1357
1359}

References Assert, ReplicationSlot::data, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, ReplicationSlot::last_saved_restart_lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotPersistentData::persistency, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_PERSISTENT, SpinLockAcquire(), SpinLockRelease(), XLogRecPtrIsValid, and XLogSetReplicationSlotMinimumLSN().

Referenced by CheckPointReplicationSlots(), copy_replication_slot(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), ReplicationSlotDropPtr(), ReplicationSlotReserveWal(), reserve_wal_for_local_slot(), StartupReplicationSlots(), and update_local_synced_slot().

◆ ReplicationSlotsComputeRequiredXmin()

void ReplicationSlotsComputeRequiredXmin ( bool  already_locked)

Definition at line 1222 of file slot.c.

1223{
1224 int i;
1227
1232
1233 /*
1234 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1235 * values, so no backend updates the initial xmin for newly created slot
1236 * concurrently. A shared lock is used here to minimize lock contention,
1237 * especially when many slots exist and advancements occur frequently.
1238 * This is safe since an exclusive lock is taken during initial slot xmin
1239 * update in slot creation.
1240 *
1241 * One might think that we can hold the ProcArrayLock exclusively and
1242 * update the slot xmin values, but it could increase lock contention on
1243 * the ProcArrayLock, which is not great since this function can be called
1244 * at non-negligible frequency.
1245 *
1246 * Concurrent invocation of this function may cause the computed slot xmin
1247 * to regress. However, this is harmless because tuples prior to the most
1248 * recent xmin are no longer useful once advancement occurs (see
1249 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1250 * before updating the effective_xmin). Thus, such regression merely
1251 * prevents VACUUM from prematurely removing tuples without causing the
1252 * early deletion of required data.
1253 */
1254 if (!already_locked)
1256
1258 {
1260 TransactionId effective_xmin;
1261 TransactionId effective_catalog_xmin;
1262 bool invalidated;
1263
1264 if (!s->in_use)
1265 continue;
1266
1268 effective_xmin = s->effective_xmin;
1269 effective_catalog_xmin = s->effective_catalog_xmin;
1270 invalidated = s->data.invalidated != RS_INVAL_NONE;
1272
1273 /* invalidated slots need not apply */
1274 if (invalidated)
1275 continue;
1276
1277 /* check the data xmin */
1278 if (TransactionIdIsValid(effective_xmin) &&
1280 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1281 agg_xmin = effective_xmin;
1282
1283 /* check the catalog xmin */
1284 if (TransactionIdIsValid(effective_catalog_xmin) &&
1286 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1287 agg_catalog_xmin = effective_catalog_xmin;
1288 }
1289
1291
1292 if (!already_locked)
1294}

References Assert, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), i, ReplicationSlot::in_use, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ProcArraySetReplicationSlotXmin(), ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, RS_INVAL_NONE, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by copy_replication_slot(), CreateInitDecodingContext(), init_conflict_slot_xmin(), InvalidateObsoleteReplicationSlots(), LogicalConfirmReceivedLocation(), pg_replication_slot_advance(), PhysicalReplicationSlotNewXmin(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), StartupReplicationSlots(), synchronize_one_slot(), update_conflict_slot_xmin(), and update_local_synced_slot().

◆ ReplicationSlotsCountDBSlots()

bool ReplicationSlotsCountDBSlots ( Oid  dboid,
int nslots,
int nactive 
)

Definition at line 1453 of file slot.c.

1454{
1455 int i;
1456
1457 *nslots = *nactive = 0;
1458
1460 return false;
1461
1464 {
1465 ReplicationSlot *s;
1466
1468
1469 /* cannot change while ReplicationSlotCtlLock is held */
1470 if (!s->in_use)
1471 continue;
1472
1473 /* only logical slots are database specific, skip */
1474 if (!SlotIsLogical(s))
1475 continue;
1476
1477 /* not our database, skip */
1478 if (s->data.database != dboid)
1479 continue;
1480
1481 /* NB: intentionally counting invalidated slots */
1482
1483 /* count slots with spinlock held */
1485 (*nslots)++;
1487 (*nactive)++;
1489 }
1491
1492 if (*nslots > 0)
1493 return true;
1494 return false;
1495}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, fb(), i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dropdb().

◆ ReplicationSlotsDropDBSlots()

void ReplicationSlotsDropDBSlots ( Oid  dboid)

Definition at line 1514 of file slot.c.

1515{
1516 int i;
1518 bool dropped = false;
1519
1521 return;
1522
1523restart:
1527 {
1528 ReplicationSlot *s;
1529 char *slotname;
1530 ProcNumber active_proc;
1531
1533
1534 /* cannot change while ReplicationSlotCtlLock is held */
1535 if (!s->in_use)
1536 continue;
1537
1538 /* only logical slots are database specific, skip */
1539 if (!SlotIsLogical(s))
1540 continue;
1541
1542 /*
1543 * Check logical slots on other databases too so we can disable
1544 * logical decoding only if no slots in the cluster.
1545 */
1549
1550 /* not our database, skip */
1551 if (s->data.database != dboid)
1552 continue;
1553
1554 /* NB: intentionally including invalidated slots to drop */
1555
1556 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1558 /* can't change while ReplicationSlotControlLock is held */
1559 slotname = NameStr(s->data.name);
1560 active_proc = s->active_proc;
1561 if (active_proc == INVALID_PROC_NUMBER)
1562 {
1565 }
1567
1568 /*
1569 * Even though we hold an exclusive lock on the database object a
1570 * logical slot for that DB can still be active, e.g. if it's
1571 * concurrently being dropped by a backend connected to another DB.
1572 *
1573 * That's fairly unlikely in practice, so we'll just bail out.
1574 *
1575 * The slot sync worker holds a shared lock on the database before
1576 * operating on synced logical slots to avoid conflict with the drop
1577 * happening here. The persistent synced slots are thus safe but there
1578 * is a possibility that the slot sync worker has created a temporary
1579 * slot (which stays active even on release) and we are trying to drop
1580 * that here. In practice, the chances of hitting this scenario are
1581 * less as during slot synchronization, the temporary slot is
1582 * immediately converted to persistent and thus is safe due to the
1583 * shared lock taken on the database. So, we'll just bail out in such
1584 * a case.
1585 *
1586 * XXX: We can consider shutting down the slot sync worker before
1587 * trying to drop synced temporary slots here.
1588 */
1589 if (active_proc != INVALID_PROC_NUMBER)
1590 ereport(ERROR,
1592 errmsg("replication slot \"%s\" is active for PID %d",
1593 slotname, GetPGProcByNumber(active_proc)->pid)));
1594
1595 /*
1596 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1597 * holding ReplicationSlotControlLock over filesystem operations,
1598 * release ReplicationSlotControlLock and use
1599 * ReplicationSlotDropAcquired.
1600 *
1601 * As that means the set of slots could change, restart scan from the
1602 * beginning each time we release the lock.
1603 */
1606 dropped = true;
1607 goto restart;
1608 }
1610
1611 if (dropped && !found_valid_logicalslot)
1613}

References ReplicationSlot::active_proc, ReplicationSlot::data, ReplicationSlotPersistentData::database, ereport, errcode(), errmsg, ERROR, fb(), GetPGProcByNumber, i, ReplicationSlot::in_use, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyProcNumber, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotDropAcquired(), RequestDisableLogicalDecoding(), RS_INVAL_NONE, SlotIsLogical, SpinLockAcquire(), and SpinLockRelease().

Referenced by dbase_redo(), and dropdb().

◆ ReplicationSlotShmemExit()

static void ReplicationSlotShmemExit ( int  code,
Datum  arg 
)
static

Definition at line 249 of file slot.c.

250{
251 /* Make sure active replication slots are released */
252 if (MyReplicationSlot != NULL)
254
255 /* Also cleanup all the temporary slots. */
257}

References fb(), MyReplicationSlot, ReplicationSlotCleanup(), and ReplicationSlotRelease().

Referenced by ReplicationSlotInitialize().

◆ ReplicationSlotsShmemInit()

◆ ReplicationSlotsShmemRequest()

static void ReplicationSlotsShmemRequest ( void arg)
static

Definition at line 200 of file slot.c.

201{
202 Size size;
203
205 return;
206
207 size = offsetof(ReplicationSlotCtlData, replication_slots);
208 size = add_size(size,
210 sizeof(ReplicationSlot)));
211 ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 .size = size,
213 .ptr = (void **) &ReplicationSlotCtl,
214 );
215}

References add_size(), fb(), max_repack_replication_slots, max_replication_slots, mul_size(), name, ReplicationSlotCtl, and ShmemRequestStruct.

◆ ReplicationSlotValidateName()

bool ReplicationSlotValidateName ( const char name,
bool  allow_reserved_name,
int  elevel 
)

Definition at line 265 of file slot.c.

267{
268 int err_code;
269 char *err_msg = NULL;
270 char *err_hint = NULL;
271
273 &err_code, &err_msg, &err_hint))
274 {
275 /*
276 * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 * and errhint(), since the messages from
278 * ReplicationSlotValidateNameInternal() are already translated. This
279 * avoids double translation.
280 */
281 ereport(elevel,
283 errmsg_internal("%s", err_msg),
284 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285
286 pfree(err_msg);
287 if (err_hint != NULL)
289 return false;
290 }
291
292 return true;
293}

References ereport, errcode(), errhint_internal(), errmsg_internal(), fb(), name, pfree(), and ReplicationSlotValidateNameInternal().

Referenced by parse_subscription_options(), ReplicationSlotCreate(), and StartupReorderBuffer().

◆ ReplicationSlotValidateNameInternal()

bool ReplicationSlotValidateNameInternal ( const char name,
bool  allow_reserved_name,
int err_code,
char **  err_msg,
char **  err_hint 
)

Definition at line 310 of file slot.c.

312{
313 const char *cp;
314
315 if (strlen(name) == 0)
316 {
318 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 *err_hint = NULL;
320 return false;
321 }
322
323 if (strlen(name) >= NAMEDATALEN)
324 {
326 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 *err_hint = NULL;
328 return false;
329 }
330
331 for (cp = name; *cp; cp++)
332 {
333 if (!((*cp >= 'a' && *cp <= 'z')
334 || (*cp >= '0' && *cp <= '9')
335 || (*cp == '_')))
336 {
338 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 return false;
341 }
342 }
343
345 {
347 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 return false;
351 }
352
353 return true;
354}

References _, CONFLICT_DETECTION_SLOT, fb(), IsSlotForConflictCheck(), name, NAMEDATALEN, and psprintf().

Referenced by check_primary_slot_name(), ReplicationSlotValidateName(), and validate_sync_standby_slots().

◆ ReportSlotInvalidation()

static void ReportSlotInvalidation ( ReplicationSlotInvalidationCause  cause,
bool  terminating,
int  pid,
NameData  slotname,
XLogRecPtr  restart_lsn,
XLogRecPtr  oldestLSN,
TransactionId  snapshotConflictHorizon,
long  slot_idle_seconds 
)
static

Definition at line 1786 of file slot.c.

1794{
1797
1800
1801 switch (cause)
1802 {
1804 {
1805 uint64 ex = oldestLSN - restart_lsn;
1806
1808 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1809 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1810 ex),
1811 LSN_FORMAT_ARGS(restart_lsn),
1812 ex);
1813 /* translator: %s is a GUC variable name */
1814 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1815 "max_slot_wal_keep_size");
1816 break;
1817 }
1818 case RS_INVAL_HORIZON:
1819 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1820 snapshotConflictHorizon);
1821 break;
1822
1823 case RS_INVAL_WAL_LEVEL:
1824 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1825 break;
1826
1828 {
1829 /* translator: %s is a GUC variable name */
1830 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1831 slot_idle_seconds, "idle_replication_slot_timeout",
1833 /* translator: %s is a GUC variable name */
1834 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1835 "idle_replication_slot_timeout");
1836 break;
1837 }
1838 case RS_INVAL_NONE:
1840 }
1841
1842 ereport(LOG,
1843 terminating ?
1844 errmsg("terminating process %d to release replication slot \"%s\"",
1845 pid, NameStr(slotname)) :
1846 errmsg("invalidating obsolete replication slot \"%s\"",
1847 NameStr(slotname)),
1849 err_hint.len ? errhint("%s", err_hint.data) : 0);
1850
1851 pfree(err_detail.data);
1852 pfree(err_hint.data);
1853}

References _, appendStringInfo(), appendStringInfoString(), ereport, errdetail_internal(), errhint(), errmsg, fb(), idle_replication_slot_timeout_secs, initStringInfo(), LOG, LSN_FORMAT_ARGS, NameStr, ngettext, pfree(), pg_unreachable, RS_INVAL_HORIZON, RS_INVAL_IDLE_TIMEOUT, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, and RS_INVAL_WAL_REMOVED.

Referenced by InvalidatePossiblyObsoleteSlot().

◆ RestoreSlotFromDisk()

static void RestoreSlotFromDisk ( const char name)
static

Definition at line 2683 of file slot.c.

2684{
2686 int i;
2687 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2688 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2689 int fd;
2690 bool restored = false;
2691 int readBytes;
2692 pg_crc32c checksum;
2693 TimestampTz now = 0;
2694
2695 /* no need to lock here, no concurrent access allowed yet */
2696
2697 /* delete temp file if it exists */
2698 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2699 sprintf(path, "%s/state.tmp", slotdir);
2700 if (unlink(path) < 0 && errno != ENOENT)
2701 ereport(PANIC,
2703 errmsg("could not remove file \"%s\": %m", path)));
2704
2705 sprintf(path, "%s/state", slotdir);
2706
2707 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2708
2709 /* on some operating systems fsyncing a file requires O_RDWR */
2711
2712 /*
2713 * We do not need to handle this as we are rename()ing the directory into
2714 * place only after we fsync()ed the state file.
2715 */
2716 if (fd < 0)
2717 ereport(PANIC,
2719 errmsg("could not open file \"%s\": %m", path)));
2720
2721 /*
2722 * Sync state file before we're reading from it. We might have crashed
2723 * while it wasn't synced yet and we shouldn't continue on that basis.
2724 */
2726 if (pg_fsync(fd) != 0)
2727 ereport(PANIC,
2729 errmsg("could not fsync file \"%s\": %m",
2730 path)));
2732
2733 /* Also sync the parent directory */
2735 fsync_fname(slotdir, true);
2737
2738 /* read part of statefile that's guaranteed to be version independent */
2743 {
2744 if (readBytes < 0)
2745 ereport(PANIC,
2747 errmsg("could not read file \"%s\": %m", path)));
2748 else
2749 ereport(PANIC,
2751 errmsg("could not read file \"%s\": read %d of %zu",
2752 path, readBytes,
2754 }
2755
2756 /* verify magic */
2757 if (cp.magic != SLOT_MAGIC)
2758 ereport(PANIC,
2760 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2761 path, cp.magic, SLOT_MAGIC)));
2762
2763 /* verify version */
2764 if (cp.version != SLOT_VERSION)
2765 ereport(PANIC,
2767 errmsg("replication slot file \"%s\" has unsupported version %u",
2768 path, cp.version)));
2769
2770 /* boundary check on length */
2771 if (cp.length != ReplicationSlotOnDiskV2Size)
2772 ereport(PANIC,
2774 errmsg("replication slot file \"%s\" has corrupted length %u",
2775 path, cp.length)));
2776
2777 /* Now that we know the size, read the entire file */
2779 readBytes = read(fd,
2781 cp.length);
2783 if (readBytes != cp.length)
2784 {
2785 if (readBytes < 0)
2786 ereport(PANIC,
2788 errmsg("could not read file \"%s\": %m", path)));
2789 else
2790 ereport(PANIC,
2792 errmsg("could not read file \"%s\": read %d of %zu",
2793 path, readBytes, (Size) cp.length)));
2794 }
2795
2796 if (CloseTransientFile(fd) != 0)
2797 ereport(PANIC,
2799 errmsg("could not close file \"%s\": %m", path)));
2800
2801 /* now verify the CRC */
2802 INIT_CRC32C(checksum);
2803 COMP_CRC32C(checksum,
2806 FIN_CRC32C(checksum);
2807
2808 if (!EQ_CRC32C(checksum, cp.checksum))
2809 ereport(PANIC,
2810 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2811 path, checksum, cp.checksum)));
2812
2813 /*
2814 * If we crashed with an ephemeral slot active, don't restore but delete
2815 * it.
2816 */
2817 if (cp.slotdata.persistency != RS_PERSISTENT)
2818 {
2819 if (!rmtree(slotdir, true))
2820 {
2822 (errmsg("could not remove directory \"%s\"",
2823 slotdir)));
2824 }
2826 return;
2827 }
2828
2829 /*
2830 * Verify that requirements for the specific slot type are met. That's
2831 * important because if these aren't met we're not guaranteed to retain
2832 * all the necessary resources for the slot.
2833 *
2834 * NB: We have to do so *after* the above checks for ephemeral slots,
2835 * because otherwise a slot that shouldn't exist anymore could prevent
2836 * restarts.
2837 *
2838 * NB: Changing the requirements here also requires adapting
2839 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2840 */
2841 if (cp.slotdata.database != InvalidOid)
2842 {
2844 ereport(FATAL,
2846 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2847 NameStr(cp.slotdata.name)),
2848 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2849
2850 /*
2851 * In standby mode, the hot standby must be enabled. This check is
2852 * necessary to ensure logical slots are invalidated when they become
2853 * incompatible due to insufficient wal_level. Otherwise, if the
2854 * primary reduces effective_wal_level < logical while hot standby is
2855 * disabled, primary disable logical decoding while hot standby is
2856 * disabled, logical slots would remain valid even after promotion.
2857 */
2859 ereport(FATAL,
2861 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2862 NameStr(cp.slotdata.name)),
2863 errhint("Change \"hot_standby\" to be \"on\".")));
2864 }
2865 else if (wal_level < WAL_LEVEL_REPLICA)
2866 ereport(FATAL,
2868 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2869 NameStr(cp.slotdata.name)),
2870 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2871
2872 /*
2873 * Nothing can be active yet, don't lock anything. Note we iterate up to
2874 * max_replication_slots instead of adding max_repack_replication_slots as
2875 * in all other places, because we must enforce the GUC value in case
2876 * there were more slots before the shutdown than what it is set up to
2877 * now.
2878 */
2879 for (i = 0; i < max_replication_slots; i++)
2880 {
2881 ReplicationSlot *slot;
2882
2884
2885 if (slot->in_use)
2886 continue;
2887
2888 /* restore the entire set of persistent data */
2889 memcpy(&slot->data, &cp.slotdata,
2891
2892 /* initialize in memory state */
2893 slot->effective_xmin = cp.slotdata.xmin;
2894 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2895 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2896 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2897
2902
2903 slot->in_use = true;
2905
2906 /*
2907 * Set the time since the slot has become inactive after loading the
2908 * slot from the disk into memory. Whoever acquires the slot i.e.
2909 * makes the slot active will reset it. Use the same inactive_since
2910 * time for all the slots.
2911 */
2912 if (now == 0)
2914
2916
2917 restored = true;
2918 break;
2919 }
2920
2921 if (!restored)
2922 ereport(FATAL,
2923 (errmsg("too many replication slots active before shutdown"),
2924 errhint("Increase \"max_replication_slots\" and try again.")));
2925}

References ReplicationSlot::active_proc, ReplicationSlot::candidate_catalog_xmin, ReplicationSlot::candidate_restart_lsn, ReplicationSlot::candidate_restart_valid, ReplicationSlot::candidate_xmin_lsn, CloseTransientFile(), COMP_CRC32C, ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, EnableHotStandby, END_CRIT_SECTION, EQ_CRC32C, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errhint(), errmsg, FATAL, fb(), fd(), FIN_CRC32C, fsync_fname(), GetCurrentTimestamp(), i, ReplicationSlot::in_use, INIT_CRC32C, INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, max_replication_slots, MAXPGPATH, memcpy(), name, NameStr, now(), OpenTransientFile(), PANIC, PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, ReplicationSlotSetInactiveSince(), rmtree(), RS_PERSISTENT, SLOT_MAGIC, SLOT_VERSION, sprintf, StandbyMode, START_CRIT_SECTION, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by StartupReplicationSlots().

◆ SaveSlotToPath()

static void SaveSlotToPath ( ReplicationSlot slot,
const char dir,
int  elevel 
)
static

Definition at line 2520 of file slot.c.

2521{
2522 char tmppath[MAXPGPATH];
2523 char path[MAXPGPATH];
2524 int fd;
2526 bool was_dirty;
2527
2528 /* first check whether there's something to write out */
2529 SpinLockAcquire(&slot->mutex);
2530 was_dirty = slot->dirty;
2531 slot->just_dirtied = false;
2532 SpinLockRelease(&slot->mutex);
2533
2534 /* and don't do anything if there's nothing to write */
2535 if (!was_dirty)
2536 return;
2537
2539
2540 /* silence valgrind :( */
2541 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2542
2543 sprintf(tmppath, "%s/state.tmp", dir);
2544 sprintf(path, "%s/state", dir);
2545
2547 if (fd < 0)
2548 {
2549 /*
2550 * If not an ERROR, then release the lock before returning. In case
2551 * of an ERROR, the error recovery path automatically releases the
2552 * lock, but no harm in explicitly releasing even in that case. Note
2553 * that LWLockRelease() could affect errno.
2554 */
2555 int save_errno = errno;
2556
2558 errno = save_errno;
2559 ereport(elevel,
2561 errmsg("could not create file \"%s\": %m",
2562 tmppath)));
2563 return;
2564 }
2565
2566 cp.magic = SLOT_MAGIC;
2567 INIT_CRC32C(cp.checksum);
2568 cp.version = SLOT_VERSION;
2570
2571 SpinLockAcquire(&slot->mutex);
2572
2573 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2574
2575 SpinLockRelease(&slot->mutex);
2576
2577 COMP_CRC32C(cp.checksum,
2580 FIN_CRC32C(cp.checksum);
2581
2582 errno = 0;
2584 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2585 {
2586 int save_errno = errno;
2587
2590 unlink(tmppath);
2592
2593 /* if write didn't set errno, assume problem is no disk space */
2595 ereport(elevel,
2597 errmsg("could not write to file \"%s\": %m",
2598 tmppath)));
2599 return;
2600 }
2602
2603 /* fsync the temporary file */
2605 if (pg_fsync(fd) != 0)
2606 {
2607 int save_errno = errno;
2608
2611 unlink(tmppath);
2613
2614 errno = save_errno;
2615 ereport(elevel,
2617 errmsg("could not fsync file \"%s\": %m",
2618 tmppath)));
2619 return;
2620 }
2622
2623 if (CloseTransientFile(fd) != 0)
2624 {
2625 int save_errno = errno;
2626
2627 unlink(tmppath);
2629
2630 errno = save_errno;
2631 ereport(elevel,
2633 errmsg("could not close file \"%s\": %m",
2634 tmppath)));
2635 return;
2636 }
2637
2638 /* rename to permanent file, fsync file and directory */
2639 if (rename(tmppath, path) != 0)
2640 {
2641 int save_errno = errno;
2642
2643 unlink(tmppath);
2645
2646 errno = save_errno;
2647 ereport(elevel,
2649 errmsg("could not rename file \"%s\" to \"%s\": %m",
2650 tmppath, path)));
2651 return;
2652 }
2653
2654 /*
2655 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2656 */
2658
2659 fsync_fname(path, false);
2660 fsync_fname(dir, true);
2662
2664
2665 /*
2666 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2667 * already and remember the confirmed_flush LSN value.
2668 */
2669 SpinLockAcquire(&slot->mutex);
2670 if (!slot->just_dirtied)
2671 slot->dirty = false;
2672 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2673 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2674 SpinLockRelease(&slot->mutex);
2675
2677}

References CloseTransientFile(), COMP_CRC32C, ReplicationSlot::data, ReplicationSlot::dirty, END_CRIT_SECTION, ereport, errcode_for_file_access(), errmsg, fb(), fd(), FIN_CRC32C, fsync_fname(), INIT_CRC32C, ReplicationSlot::io_in_progress_lock, ReplicationSlot::just_dirtied, ReplicationSlot::last_saved_confirmed_flush, ReplicationSlot::last_saved_restart_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, memcpy(), ReplicationSlot::mutex, OpenTransientFile(), PG_BINARY, pg_fsync(), PG_REPLSLOT_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), ReplicationSlotOnDiskChecksummedSize, ReplicationSlotOnDiskNotChecksummedSize, ReplicationSlotOnDiskV2Size, SLOT_MAGIC, SLOT_VERSION, SpinLockAcquire(), SpinLockRelease(), sprintf, START_CRIT_SECTION, and write.

Referenced by CheckPointReplicationSlots(), CreateSlotOnDisk(), and ReplicationSlotSave().

◆ SearchNamedReplicationSlot()

◆ SlotExistsInSyncStandbySlots()

bool SlotExistsInSyncStandbySlots ( const char slot_name)

Definition at line 3076 of file slot.c.

3077{
3078 const char *standby_slot_name;
3079
3080 /* Return false if there is no value in synchronized_standby_slots */
3082 return false;
3083
3084 /*
3085 * XXX: We are not expecting this list to be long so a linear search
3086 * shouldn't hurt but if that turns out not to be true then we can cache
3087 * this information for each WalSender as well.
3088 */
3090 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3091 {
3092 if (strcmp(standby_slot_name, slot_name) == 0)
3093 return true;
3094
3096 }
3097
3098 return false;
3099}

References fb(), i, SyncStandbySlotsConfigData::nslotnames, SyncStandbySlotsConfigData::slot_names, and synchronized_standby_slots_config.

Referenced by PhysicalWakeupLogicalWalSnd().

◆ StandbySlotsHaveCaughtup()

bool StandbySlotsHaveCaughtup ( XLogRecPtr  wait_for_lsn,
int  elevel 
)

Definition at line 3109 of file slot.c.

3110{
3111 const char *name;
3112 int caught_up_slot_num = 0;
3114
3115 /*
3116 * Don't need to wait for the standbys to catch up if there is no value in
3117 * synchronized_standby_slots.
3118 */
3120 return true;
3121
3122 /*
3123 * Don't need to wait for the standbys to catch up if we are on a standby
3124 * server, since we do not support syncing slots to cascading standbys.
3125 */
3126 if (RecoveryInProgress())
3127 return true;
3128
3129 /*
3130 * Don't need to wait for the standbys to catch up if they are already
3131 * beyond the specified WAL location.
3132 */
3135 return true;
3136
3137 /*
3138 * To prevent concurrent slot dropping and creation while filtering the
3139 * slots, take the ReplicationSlotControlLock outside of the loop.
3140 */
3142
3144 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3145 {
3146 XLogRecPtr restart_lsn;
3147 bool invalidated;
3148 bool inactive;
3149 ReplicationSlot *slot;
3150
3151 slot = SearchNamedReplicationSlot(name, false);
3152
3153 /*
3154 * If a slot name provided in synchronized_standby_slots does not
3155 * exist, report a message and exit the loop.
3156 */
3157 if (!slot)
3158 {
3159 ereport(elevel,
3161 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3162 name, "synchronized_standby_slots"),
3163 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3164 name),
3165 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3166 name, "synchronized_standby_slots"));
3167 break;
3168 }
3169
3170 /* Same as above: if a slot is not physical, exit the loop. */
3171 if (SlotIsLogical(slot))
3172 {
3173 ereport(elevel,
3175 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3176 name, "synchronized_standby_slots"),
3177 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3178 name),
3179 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3180 name, "synchronized_standby_slots"));
3181 break;
3182 }
3183
3184 SpinLockAcquire(&slot->mutex);
3185 restart_lsn = slot->data.restart_lsn;
3186 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3188 SpinLockRelease(&slot->mutex);
3189
3190 if (invalidated)
3191 {
3192 /* Specified physical slot has been invalidated */
3193 ereport(elevel,
3195 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3196 name, "synchronized_standby_slots"),
3197 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3198 name),
3199 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3200 name, "synchronized_standby_slots"));
3201 break;
3202 }
3203
3204 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3205 {
3206 /* Log a message if no active_pid for this physical slot */
3207 if (inactive)
3208 ereport(elevel,
3210 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3211 name, "synchronized_standby_slots"),
3212 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3213 name),
3214 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3215 name, "synchronized_standby_slots"));
3216
3217 /* Continue if the current slot hasn't caught up. */
3218 break;
3219 }
3220
3221 Assert(restart_lsn >= wait_for_lsn);
3222
3224 min_restart_lsn > restart_lsn)
3225 min_restart_lsn = restart_lsn;
3226
3228
3229 name += strlen(name) + 1;
3230 }
3231
3233
3234 /*
3235 * Return false if not all the standbys have caught up to the specified
3236 * WAL location.
3237 */
3239 return false;
3240
3241 /* The ss_oldest_flush_lsn must not retreat. */
3244
3246
3247 return true;
3248}

References ReplicationSlot::active_proc, Assert, ReplicationSlot::data, ereport, errcode(), errdetail(), errhint(), errmsg, fb(), i, INVALID_PROC_NUMBER, ReplicationSlotPersistentData::invalidated, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, name, SyncStandbySlotsConfigData::nslotnames, RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, SearchNamedReplicationSlot(), SyncStandbySlotsConfigData::slot_names, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), ss_oldest_flush_lsn, synchronized_standby_slots_config, and XLogRecPtrIsValid.

Referenced by NeedToWaitForStandbys(), and WaitForStandbyConfirmation().

◆ StartupReplicationSlots()

void StartupReplicationSlots ( void  )

Definition at line 2398 of file slot.c.

2399{
2401 struct dirent *replication_de;
2402
2403 elog(DEBUG1, "starting up replication slots");
2404
2405 /* restore all slots by iterating over all on-disk entries */
2408 {
2409 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2411
2412 if (strcmp(replication_de->d_name, ".") == 0 ||
2413 strcmp(replication_de->d_name, "..") == 0)
2414 continue;
2415
2416 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2418
2419 /* we're only creating directories here, skip if it's not our's */
2421 continue;
2422
2423 /* we crashed while a slot was being setup or deleted, clean up */
2424 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2425 {
2426 if (!rmtree(path, true))
2427 {
2429 (errmsg("could not remove directory \"%s\"",
2430 path)));
2431 continue;
2432 }
2434 continue;
2435 }
2436
2437 /* looks like a slot in a normal state, restore */
2439 }
2441
2442 /* currently no slots exist, we're done. */
2444 return;
2445
2446 /* Now that we have recovered all the data, compute replication xmin */
2449}

References AllocateDir(), DEBUG1, elog, ereport, errmsg, fb(), FreeDir(), fsync_fname(), get_dirent_type(), max_repack_replication_slots, max_replication_slots, MAXPGPATH, PG_REPLSLOT_DIR, pg_str_endswith(), PGFILETYPE_DIR, PGFILETYPE_ERROR, ReadDir(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RestoreSlotFromDisk(), rmtree(), snprintf, and WARNING.

Referenced by StartupXLOG().

◆ StaticAssertDecl()

StaticAssertDecl ( lengthof(SlotInvalidationCauses = =(RS_INVAL_MAX_CAUSES+1),
"array length mismatch"   
)

◆ validate_sync_standby_slots()

static bool validate_sync_standby_slots ( char rawname,
List **  elemlist 
)
static

Definition at line 2971 of file slot.c.

2972{
2973 /* Verify syntax and parse string into a list of identifiers */
2975 {
2976 GUC_check_errdetail("List syntax is invalid.");
2977 return false;
2978 }
2979
2980 /* Iterate the list to validate each slot name */
2981 foreach_ptr(char, name, *elemlist)
2982 {
2983 int err_code;
2984 char *err_msg = NULL;
2985 char *err_hint = NULL;
2986
2988 &err_msg, &err_hint))
2989 {
2991 GUC_check_errdetail("%s", err_msg);
2992 if (err_hint != NULL)
2994 return false;
2995 }
2996 }
2997
2998 return true;
2999}

References fb(), foreach_ptr, GUC_check_errcode(), GUC_check_errdetail, GUC_check_errhint, name, ReplicationSlotValidateNameInternal(), and SplitIdentifierString().

Referenced by check_synchronized_standby_slots().

◆ WaitForStandbyConfirmation()

void WaitForStandbyConfirmation ( XLogRecPtr  wait_for_lsn)

Definition at line 3257 of file slot.c.

3258{
3259 /*
3260 * Don't need to wait for the standby to catch up if the current acquired
3261 * slot is not a logical failover slot, or there is no value in
3262 * synchronized_standby_slots.
3263 */
3265 return;
3266
3268
3269 for (;;)
3270 {
3272
3274 {
3275 ConfigReloadPending = false;
3277 }
3278
3279 /* Exit if done waiting for every slot. */
3281 break;
3282
3283 /*
3284 * Wait for the slots in the synchronized_standby_slots to catch up,
3285 * but use a timeout (1s) so we can also check if the
3286 * synchronized_standby_slots has been changed.
3287 */
3290 }
3291
3293}

References CHECK_FOR_INTERRUPTS, ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableTimedSleep(), ConfigReloadPending, ReplicationSlot::data, ReplicationSlotPersistentData::failover, fb(), MyReplicationSlot, PGC_SIGHUP, ProcessConfigFile(), StandbySlotsHaveCaughtup(), synchronized_standby_slots_config, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtl, and WARNING.

Referenced by LogicalSlotAdvanceAndCheckSnapState(), and pg_logical_slot_get_changes_guts().

Variable Documentation

◆ idle_replication_slot_timeout_secs

int idle_replication_slot_timeout_secs = 0

◆ max_repack_replication_slots

◆ max_replication_slots

◆ MyReplicationSlot

ReplicationSlot* MyReplicationSlot = NULL

Definition at line 158 of file slot.c.

Referenced by ApplyLauncherMain(), binary_upgrade_check_logical_slot_pending_wal(), compute_min_nonremovable_xid(), copy_replication_slot(), create_logical_replication_slot(), create_physical_replication_slot(), CreateConflictDetectionSlot(), CreateDecodingContext(), CreateInitDecodingContext(), CreateReplicationSlot(), DisableLogicalDecodingIfNecessary(), EnsureLogicalDecodingEnabled(), init_conflict_slot_xmin(), InvalidatePossiblyObsoleteSlot(), LogicalConfirmReceivedLocation(), LogicalIncreaseRestartDecodingForSlot(), LogicalIncreaseXminForSlot(), logicalrep_worker_launch(), LogicalReplicationSlotCheckPendingWal(), LogicalSlotAdvanceAndCheckSnapState(), NeedToWaitForStandbys(), pg_create_logical_replication_slot(), pg_create_physical_replication_slot(), pg_logical_slot_get_changes_guts(), pg_physical_replication_slot_advance(), pg_replication_slot_advance(), PhysicalConfirmReceivedLocation(), PhysicalReplicationSlotNewXmin(), PhysicalWakeupLogicalWalSnd(), PostgresMain(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), ReorderBufferAllocate(), ReorderBufferFree(), ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReplicationSlotAcquire(), ReplicationSlotAlter(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDrop(), ReplicationSlotDropAcquired(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotsDropDBSlots(), ReplicationSlotShmemExit(), reserve_wal_for_local_slot(), slotsync_failure_callback(), slotsync_worker_onexit(), StartLogicalReplication(), StartReplication(), StartupDecodingContext(), synchronize_one_slot(), update_and_persist_local_synced_slot(), update_conflict_slot_xmin(), update_local_synced_slot(), update_slotsync_skip_stats(), WaitForStandbyConfirmation(), and WalSndErrorCleanup().

◆ ReplicationSlotCtl

◆ ReplicationSlotsShmemCallbacks

const ShmemCallbacks ReplicationSlotsShmemCallbacks
Initial value:

Definition at line 152 of file slot.c.

152 {
153 .request_fn = ReplicationSlotsShmemRequest,
154 .init_fn = ReplicationSlotsShmemInit,
155};

◆ SlotInvalidationCauses

const SlotInvalidationCauseMap SlotInvalidationCauses[]
static
Initial value:
= {
{RS_INVAL_NONE, "none"},
{RS_INVAL_WAL_REMOVED, "wal_removed"},
{RS_INVAL_HORIZON, "rows_removed"},
{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
}

Definition at line 115 of file slot.c.

115 {
116 {RS_INVAL_NONE, "none"},
117 {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 {RS_INVAL_HORIZON, "rows_removed"},
119 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121};

Referenced by GetSlotInvalidationCause(), and GetSlotInvalidationCauseName().

◆ ss_oldest_flush_lsn

XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr
static

Definition at line 185 of file slot.c.

Referenced by assign_synchronized_standby_slots(), and StandbySlotsHaveCaughtup().

◆ synchronized_standby_slots

char* synchronized_standby_slots

Definition at line 176 of file slot.c.

◆ synchronized_standby_slots_config