PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "storage/subsystems.h"
59#include "utils/builtins.h"
60#include "utils/guc_hooks.h"
62#include "utils/varlena.h"
63#include "utils/wait_event.h"
64
65/*
66 * Replication slot on-disk data structure.
67 */
69{
70 /* first part of this struct needs to be version independent */
71
72 /* data not covered by checksum */
75
76 /* data covered by checksum */
79
80 /*
81 * The actual data in the slot that follows can differ based on the above
82 * 'version'.
83 */
84
87
88/*
89 * Struct for the configuration of synchronized_standby_slots.
90 *
91 * Note: this must be a flat representation that can be held in a single chunk
92 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
93 * synchronized_standby_slots GUC.
94 */
95typedef struct
96{
97 /* Number of slot names in the slot_names[] */
99
100 /*
101 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
102 */
103 char slot_names[FLEXIBLE_ARRAY_MEMBER];
105
106/*
107 * Lookup table for slot invalidation causes.
108 */
114
116 {RS_INVAL_NONE, "none"},
117 {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 {RS_INVAL_HORIZON, "rows_removed"},
119 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121};
122
123/*
124 * Ensure that the lookup table is up-to-date with the enums defined in
125 * ReplicationSlotInvalidationCause.
126 */
128 "array length mismatch");
129
130/* size of version independent data */
131#define ReplicationSlotOnDiskConstantSize \
132 offsetof(ReplicationSlotOnDisk, slotdata)
133/* size of the part of the slot not covered by the checksum */
134#define ReplicationSlotOnDiskNotChecksummedSize \
135 offsetof(ReplicationSlotOnDisk, version)
136/* size of the part covered by the checksum */
137#define ReplicationSlotOnDiskChecksummedSize \
138 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
139/* size of the slot data that is version dependent */
140#define ReplicationSlotOnDiskV2Size \
141 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
142
143#define SLOT_MAGIC 0x1051CA1 /* format identifier */
144#define SLOT_VERSION 5 /* version for new files */
145
146/* Control array for replication slot management */
148
149static void ReplicationSlotsShmemRequest(void *arg);
150static void ReplicationSlotsShmemInit(void *arg);
151
156
157/* My backend's replication slot in the shared memory array */
159
160/* GUC variables */
161int max_replication_slots = 10; /* the maximum number of replication
162 * slots */
163int max_repack_replication_slots = 5; /* the maximum number of slots
164 * for REPACK */
165
166/*
167 * Invalidate replication slots that have remained idle longer than this
168 * duration; '0' disables it.
169 */
171
172/*
173 * This GUC lists streaming replication standby server slot names that
174 * logical WAL sender processes will wait for.
175 */
177
178/* This is the parsed and cached configuration for synchronized_standby_slots */
180
181/*
182 * Oldest LSN that has been confirmed to be flushed to the standbys
183 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 */
186
187static void ReplicationSlotShmemExit(int code, Datum arg);
188static bool IsSlotForConflictCheck(const char *name);
189static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190
191/* internal persistency functions */
192static void RestoreSlotFromDisk(const char *name);
193static void CreateSlotOnDisk(ReplicationSlot *slot);
194static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195
196/*
197 * Register shared memory space needed for replication slots.
198 */
199static void
201{
202 Size size;
203
205 return;
206
207 size = offsetof(ReplicationSlotCtlData, replication_slots);
208 size = add_size(size,
210 sizeof(ReplicationSlot)));
211 ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 .size = size,
213 .ptr = (void **) &ReplicationSlotCtl,
214 );
215}
216
217/*
218 * Initialize shared memory for replication slots.
219 */
220static void
222{
224 {
226
227 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
233 }
234}
235
236/*
237 * Register the callback for replication slot cleanup and releasing.
238 */
239void
244
245/*
246 * Release and cleanup replication slots.
247 */
248static void
250{
251 /* Make sure active replication slots are released */
252 if (MyReplicationSlot != NULL)
254
255 /* Also cleanup all the temporary slots. */
257}
258
259/*
260 * Check whether the passed slot name is valid and report errors at elevel.
261 *
262 * See comments for ReplicationSlotValidateNameInternal().
263 */
264bool
266 int elevel)
267{
268 int err_code;
269 char *err_msg = NULL;
270 char *err_hint = NULL;
271
273 &err_code, &err_msg, &err_hint))
274 {
275 /*
276 * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 * and errhint(), since the messages from
278 * ReplicationSlotValidateNameInternal() are already translated. This
279 * avoids double translation.
280 */
281 ereport(elevel,
283 errmsg_internal("%s", err_msg),
284 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285
286 pfree(err_msg);
287 if (err_hint != NULL)
289 return false;
290 }
291
292 return true;
293}
294
295/*
296 * Check whether the passed slot name is valid.
297 *
298 * An error will be reported for a reserved replication slot name if
299 * allow_reserved_name is set to false.
300 *
301 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 * the name to be used as a directory name on every supported OS.
303 *
304 * Returns true if the slot name is valid. Otherwise, returns false and stores
305 * the error code, error message, and optional hint in err_code, err_msg, and
306 * err_hint, respectively. The caller is responsible for freeing err_msg and
307 * err_hint, which are palloc'd.
308 */
309bool
311 int *err_code, char **err_msg, char **err_hint)
312{
313 const char *cp;
314
315 if (strlen(name) == 0)
316 {
318 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 *err_hint = NULL;
320 return false;
321 }
322
323 if (strlen(name) >= NAMEDATALEN)
324 {
326 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 *err_hint = NULL;
328 return false;
329 }
330
331 for (cp = name; *cp; cp++)
332 {
333 if (!((*cp >= 'a' && *cp <= 'z')
334 || (*cp >= '0' && *cp <= '9')
335 || (*cp == '_')))
336 {
338 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 return false;
341 }
342 }
343
345 {
347 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 return false;
351 }
352
353 return true;
354}
355
356/*
357 * Return true if the replication slot name is "pg_conflict_detection".
358 */
359static bool
361{
362 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363}
364
365/*
366 * Create a new replication slot and mark it as used by this backend.
367 *
368 * name: Name of the slot
369 * db_specific: logical decoding is db specific; if the slot is going to
370 * be used for that pass true, otherwise false.
371 * two_phase: If enabled, allows decoding of prepared transactions.
372 * repack: If true, use a slot from the pool for REPACK.
373 * failover: If enabled, allows the slot to be synced to standbys so
374 * that logical replication can be resumed after failover.
375 * synced: True if the slot is synchronized from the primary server.
376 */
377void
379 ReplicationSlotPersistency persistency,
380 bool two_phase, bool repack, bool failover, bool synced)
381{
382 ReplicationSlot *slot = NULL;
383 int startpoint,
384 endpoint;
385
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
431
432 /*
433 * Check for name collision (across the whole array), and identify an
434 * allocatable slot (in the array slice specific to our current use case:
435 * either general, or REPACK only). We need to hold
436 * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 * can change the in_use flags while we're looking at them.
438 */
440 startpoint = !repack ? 0 : max_replication_slots;
443 {
445
446 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
449 errmsg("replication slot \"%s\" already exists", name)));
450
451 if (i >= startpoint && i < endpoint &&
452 !s->in_use && slot == NULL)
453 slot = s;
454 }
456
457 /* If all slots are in use, we're out of luck. */
458 if (slot == NULL)
461 errmsg("all replication slots are in use"),
462 errhint("Free one or increase \"%s\".",
463 repack ? "max_repack_replication_slots" : "max_replication_slots")));
464
465 /*
466 * Since this slot is not in use, nobody should be looking at any part of
467 * it other than the in_use field unless they're trying to allocate it.
468 * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 * be doing that. So it's safe to initialize the slot.
470 */
471 Assert(!slot->in_use);
473
474 /* first initialize persistent data */
475 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 namestrcpy(&slot->data.name, name);
478 slot->data.persistency = persistency;
479 slot->data.two_phase = two_phase;
481 slot->data.failover = failover;
482 slot->data.synced = synced;
483
484 /* and then data only present in shared memory */
485 slot->just_dirtied = false;
486 slot->dirty = false;
495 slot->inactive_since = 0;
497
498 /*
499 * Create the slot on disk. We haven't actually marked the slot allocated
500 * yet, so no special cleanup is required if this errors out.
501 */
502 CreateSlotOnDisk(slot);
503
504 /*
505 * We need to briefly prevent any other backend from iterating over the
506 * slots while we flip the in_use flag. We also need to set the active
507 * flag while holding the ControlLock as otherwise a concurrent
508 * ReplicationSlotAcquire() could acquire the slot as well.
509 */
511
512 slot->in_use = true;
513
514 /* We can now mark the slot active, and that makes it our slot. */
515 SpinLockAcquire(&slot->mutex);
518 SpinLockRelease(&slot->mutex);
519 MyReplicationSlot = slot;
520
522
523 /*
524 * Create statistics entry for the new logical slot. We don't collect any
525 * stats for physical slots, so no need to create an entry for the same.
526 * See ReplicationSlotDropPtr for why we need to do this before releasing
527 * ReplicationSlotAllocationLock.
528 */
529 if (SlotIsLogical(slot))
531
532 /*
533 * Now that the slot has been marked as in_use and active, it's safe to
534 * let somebody else try to allocate a slot.
535 */
537
538 /* Let everybody know we've modified this slot */
540}
541
542/*
543 * Search for the named replication slot.
544 *
545 * Return the replication slot if found, otherwise NULL.
546 */
549{
550 int i;
551 ReplicationSlot *slot = NULL;
552
553 if (need_lock)
555
557 {
559
560 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 {
562 slot = s;
563 break;
564 }
565 }
566
567 if (need_lock)
569
570 return slot;
571}
572
573/*
574 * Return the index of the replication slot in
575 * ReplicationSlotCtl->replication_slots.
576 *
577 * This is mainly useful to have an efficient key for storing replication slot
578 * stats.
579 */
580int
589
590/*
591 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 * the slot's name and true is returned.
593 *
594 * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 * cases there are obvious TOCTOU issues.
596 */
597bool
599{
600 ReplicationSlot *slot;
601 bool found;
602
604
605 /*
606 * Ensure that the slot cannot be dropped while we copy the name. Don't
607 * need the spinlock as the name of an existing slot cannot change.
608 */
610 found = slot->in_use;
611 if (slot->in_use)
614
615 return found;
616}
617
618/*
619 * Find a previously created slot and mark it as used by this process.
620 *
621 * An error is raised if nowait is true and the slot is currently in use. If
622 * nowait is false, we sleep until the slot is released by the owning process.
623 *
624 * An error is raised if error_if_invalid is true and the slot is found to
625 * be invalid. It should always be set to true, except when we are temporarily
626 * acquiring the slot and don't intend to change it.
627 */
628void
629ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630{
632 ProcNumber active_proc;
633 int active_pid;
634
635 Assert(name != NULL);
636
637retry:
639
641
642 /* Check if the slot exists with the given name. */
644 if (s == NULL || !s->in_use)
645 {
647
650 errmsg("replication slot \"%s\" does not exist",
651 name)));
652 }
653
654 /*
655 * Do not allow users to acquire the reserved slot. This scenario may
656 * occur if the launcher that owns the slot has terminated unexpectedly
657 * due to an error, and a backend process attempts to reuse the slot.
658 */
662 errmsg("cannot acquire replication slot \"%s\"", name),
663 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664
665 /*
666 * This is the slot we want; check if it's active under some other
667 * process. In single user mode, we don't need this check.
668 */
670 {
671 /*
672 * Get ready to sleep on the slot in case it is active. (We may end
673 * up not sleeping, but we don't want to do this while holding the
674 * spinlock.)
675 */
676 if (!nowait)
678
679 /*
680 * It is important to reset the inactive_since under spinlock here to
681 * avoid race conditions with slot invalidation. See comments related
682 * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 */
687 active_proc = s->active_proc;
690 }
691 else
692 {
693 s->active_proc = active_proc = MyProcNumber;
695 }
696 active_pid = GetPGProcByNumber(active_proc)->pid;
698
699 /*
700 * If we found the slot but it's already active in another process, we
701 * wait until the owning process signals us that it's been released, or
702 * error out.
703 */
704 if (active_proc != MyProcNumber)
705 {
706 if (!nowait)
707 {
708 /* Wait here until we get signaled, and then restart */
712 goto retry;
713 }
714
717 errmsg("replication slot \"%s\" is active for PID %d",
718 NameStr(s->data.name), active_pid)));
719 }
720 else if (!nowait)
721 ConditionVariableCancelSleep(); /* no sleep needed after all */
722
723 /* We made this slot active, so it's ours now. */
725
726 /*
727 * We need to check for invalidation after making the slot ours to avoid
728 * the possible race condition with the checkpointer that can otherwise
729 * invalidate the slot immediately after the check.
730 */
734 errmsg("can no longer access replication slot \"%s\"",
735 NameStr(s->data.name)),
736 errdetail("This replication slot has been invalidated due to \"%s\".",
738
739 /* Let everybody know we've modified this slot */
741
742 /*
743 * The call to pgstat_acquire_replslot() protects against stats for a
744 * different slot, from before a restart or such, being present during
745 * pgstat_report_replslot().
746 */
747 if (SlotIsLogical(s))
749
750
751 if (am_walsender)
752 {
755 ? errmsg("acquired logical replication slot \"%s\"",
756 NameStr(s->data.name))
757 : errmsg("acquired physical replication slot \"%s\"",
758 NameStr(s->data.name)));
759 }
760}
761
762/*
763 * Release the replication slot that this backend considers to own.
764 *
765 * This or another backend can re-acquire the slot later.
766 * Resources this slot requires will be preserved.
767 */
768void
770{
772 char *slotname = NULL; /* keep compiler quiet */
773 bool is_logical;
774 TimestampTz now = 0;
775
776 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777
779
780 if (am_walsender)
781 slotname = pstrdup(NameStr(slot->data.name));
782
783 if (slot->data.persistency == RS_EPHEMERAL)
784 {
785 /*
786 * If slot is ephemeral, we drop it upon release, and request logical
787 * decoding be disabled.
788 */
790 }
791
792 /*
793 * If slot needed to temporarily restrain both data and catalog xmin to
794 * create the catalog snapshot, remove that temporary constraint.
795 * Snapshots can only be exported while the initial snapshot is still
796 * acquired.
797 */
798 if (!TransactionIdIsValid(slot->data.xmin) &&
800 {
801 SpinLockAcquire(&slot->mutex);
803 SpinLockRelease(&slot->mutex);
805 }
806
807 /*
808 * Set the time since the slot has become inactive. We get the current
809 * time beforehand to avoid system call while holding the spinlock.
810 */
812
813 if (slot->data.persistency == RS_PERSISTENT)
814 {
815 /*
816 * Mark persistent slot inactive. We're not freeing it, just
817 * disconnecting, but wake up others that may be waiting for it.
818 */
819 SpinLockAcquire(&slot->mutex);
822 SpinLockRelease(&slot->mutex);
824 }
825 else
827
829
830 /* might not have been set when we've been a plain slot */
835
836 if (am_walsender)
837 {
840 ? errmsg("released logical replication slot \"%s\"",
841 slotname)
842 : errmsg("released physical replication slot \"%s\"",
843 slotname));
844
845 pfree(slotname);
846 }
847}
848
849/*
850 * Cleanup temporary slots created in current session.
851 *
852 * Cleanup only synced temporary slots if 'synced_only' is true, else
853 * cleanup all temporary slots.
854 *
855 * If it drops the last logical slot in the cluster, requests to disable
856 * logical decoding.
857 */
858void
860{
861 int i;
863 bool dropped_logical = false;
864
866
867restart:
871 {
873
874 if (!s->in_use)
875 continue;
876
878
881
882 if ((s->active_proc == MyProcNumber &&
883 (!synced_only || s->data.synced)))
884 {
887 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
888
889 if (SlotIsLogical(s))
890 dropped_logical = true;
891
893
895 goto restart;
896 }
897 else
899 }
900
902
905}
906
907/*
908 * Permanently drop the replication slot identified by the passed-in name.
909 *
910 * If this is a logical slot, request that logical decoding be disabled.
911 */
912void
913ReplicationSlotDrop(const char *name, bool nowait)
914{
915 ReplicationSlotAcquire(name, nowait, false);
916
917 /*
918 * Do not allow users to drop the slots which are currently being synced
919 * from the primary to the standby.
920 */
924 errmsg("cannot drop replication slot \"%s\"", name),
925 errdetail("This replication slot is being synchronized from the primary server."));
926
928}
929
930/*
931 * Change the definition of the slot identified by the specified name.
932 *
933 * Altering the two_phase property of a slot requires caution on the
934 * client-side. Enabling it at any random point during decoding has the
935 * risk that transactions prepared before this change may be skipped by
936 * the decoder, leading to missing prepare records on the client. So, we
937 * enable it for subscription related slots only once the initial tablesync
938 * is finished. See comments atop worker.c. Disabling it is safe only when
939 * there are no pending prepared transaction, otherwise, the changes of
940 * already prepared transactions can be replicated again along with their
941 * corresponding commit leading to duplicate data or errors.
942 */
943void
944ReplicationSlotAlter(const char *name, const bool *failover,
945 const bool *two_phase)
946{
947 bool update_slot = false;
948
951
952 ReplicationSlotAcquire(name, false, true);
953
957 errmsg("cannot use %s with a physical replication slot",
958 "ALTER_REPLICATION_SLOT"));
959
960 if (RecoveryInProgress())
961 {
962 /*
963 * Do not allow users to alter the slots which are currently being
964 * synced from the primary to the standby.
965 */
969 errmsg("cannot alter replication slot \"%s\"", name),
970 errdetail("This replication slot is being synchronized from the primary server."));
971
972 /*
973 * Do not allow users to enable failover on the standby as we do not
974 * support sync to the cascading standby.
975 */
976 if (failover && *failover)
979 errmsg("cannot enable failover for a replication slot"
980 " on the standby"));
981 }
982
983 if (failover)
984 {
985 /*
986 * Do not allow users to enable failover for temporary slots as we do
987 * not support syncing temporary slots to the standby.
988 */
992 errmsg("cannot enable failover for a temporary replication slot"));
993
995 {
999
1000 update_slot = true;
1001 }
1002 }
1003
1005 {
1009
1010 update_slot = true;
1011 }
1012
1013 if (update_slot)
1014 {
1017 }
1018
1020}
1021
1022/*
1023 * Permanently drop the currently acquired replication slot.
1024 *
1025 * If caller requests it, have checkpointer attempt to disable logical
1026 * decoding. Obviously, this should only be done if the slot is logical.
1027 */
1028void
1030{
1031 ReplicationSlot *slot;
1032
1034 slot = MyReplicationSlot;
1035
1036 /* Can only disable logical decoding if slot is logical */
1037 Assert(!try_disable || SlotIsLogical(slot));
1038
1039 /* slot isn't acquired anymore */
1041
1043
1044 if (try_disable)
1046}
1047
1048/*
1049 * Permanently drop the replication slot which will be released by the point
1050 * this function returns.
1051 */
1052static void
1054{
1055 char path[MAXPGPATH];
1056 char tmppath[MAXPGPATH];
1057
1058 /*
1059 * If some other backend ran this code concurrently with us, we might try
1060 * to delete a slot with a certain name while someone else was trying to
1061 * create a slot with the same name.
1062 */
1064
1065 /* Generate pathnames. */
1066 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1067 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1068
1069 /*
1070 * Rename the slot directory on disk, so that we'll no longer recognize
1071 * this as a valid slot. Note that if this fails, we've got to mark the
1072 * slot inactive before bailing out. If we're dropping an ephemeral or a
1073 * temporary slot, we better never fail hard as the caller won't expect
1074 * the slot to survive and this might get called during error handling.
1075 */
1076 if (rename(path, tmppath) == 0)
1077 {
1078 /*
1079 * We need to fsync() the directory we just renamed and its parent to
1080 * make sure that our changes are on disk in a crash-safe fashion. If
1081 * fsync() fails, we can't be sure whether the changes are on disk or
1082 * not. For now, we handle that by panicking;
1083 * StartupReplicationSlots() will try to straighten it out after
1084 * restart.
1085 */
1087 fsync_fname(tmppath, true);
1090 }
1091 else
1092 {
1093 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1094
1095 SpinLockAcquire(&slot->mutex);
1097 SpinLockRelease(&slot->mutex);
1098
1099 /* wake up anyone waiting on this slot */
1101
1104 errmsg("could not rename file \"%s\" to \"%s\": %m",
1105 path, tmppath)));
1106 }
1107
1108 /*
1109 * The slot is definitely gone. Lock out concurrent scans of the array
1110 * long enough to kill it. It's OK to clear the active PID here without
1111 * grabbing the mutex because nobody else can be scanning the array here,
1112 * and nobody can be attached to this slot and thus access it without
1113 * scanning the array.
1114 *
1115 * Also wake up processes waiting for it.
1116 */
1119 slot->in_use = false;
1122
1123 /*
1124 * Slot is dead and doesn't prevent resource removal anymore, recompute
1125 * limits.
1126 */
1129
1130 /*
1131 * If removing the directory fails, the worst thing that will happen is
1132 * that the user won't be able to create a new slot with the same name
1133 * until the next server restart. We warn about it, but that's all.
1134 */
1135 if (!rmtree(tmppath, true))
1137 (errmsg("could not remove directory \"%s\"", tmppath)));
1138
1139 /*
1140 * Drop the statistics entry for the replication slot. Do this while
1141 * holding ReplicationSlotAllocationLock so that we don't drop a
1142 * statistics entry for another slot with the same name just created in
1143 * another session.
1144 */
1145 if (SlotIsLogical(slot))
1147
1148 /*
1149 * We release this at the very end, so that nobody starts trying to create
1150 * a slot while we're still cleaning up the detritus of the old one.
1151 */
1153}
1154
1155/*
1156 * Serialize the currently acquired slot's state from memory to disk, thereby
1157 * guaranteeing the current state will survive a crash.
1158 */
1159void
1161{
1162 char path[MAXPGPATH];
1163
1165
1168}
1169
1170/*
1171 * Signal that it would be useful if the currently acquired slot would be
1172 * flushed out to disk.
1173 *
1174 * Note that the actual flush to disk can be delayed for a long time, if
1175 * required for correctness explicitly do a ReplicationSlotSave().
1176 */
1177void
1179{
1181
1183
1184 SpinLockAcquire(&slot->mutex);
1186 MyReplicationSlot->dirty = true;
1187 SpinLockRelease(&slot->mutex);
1188}
1189
1190/*
1191 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1192 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1193 */
1194void
1196{
1198
1199 Assert(slot != NULL);
1201
1202 SpinLockAcquire(&slot->mutex);
1204 SpinLockRelease(&slot->mutex);
1205
1208}
1209
1210/*
1211 * Compute the oldest xmin across all slots and store it in the ProcArray.
1212 *
1213 * If already_locked is true, both the ReplicationSlotControlLock and the
1214 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1215 * caller first acquires the ReplicationSlotControlLock, followed by the
1216 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1217 * acquires them in that order.
1218 */
1219void
1221{
1222 int i;
1225
1230
1231 /*
1232 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1233 * values, so no backend updates the initial xmin for newly created slot
1234 * concurrently. A shared lock is used here to minimize lock contention,
1235 * especially when many slots exist and advancements occur frequently.
1236 * This is safe since an exclusive lock is taken during initial slot xmin
1237 * update in slot creation.
1238 *
1239 * One might think that we can hold the ProcArrayLock exclusively and
1240 * update the slot xmin values, but it could increase lock contention on
1241 * the ProcArrayLock, which is not great since this function can be called
1242 * at non-negligible frequency.
1243 *
1244 * Concurrent invocation of this function may cause the computed slot xmin
1245 * to regress. However, this is harmless because tuples prior to the most
1246 * recent xmin are no longer useful once advancement occurs (see
1247 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1248 * before updating the effective_xmin). Thus, such regression merely
1249 * prevents VACUUM from prematurely removing tuples without causing the
1250 * early deletion of required data.
1251 */
1252 if (!already_locked)
1254
1256 {
1258 TransactionId effective_xmin;
1259 TransactionId effective_catalog_xmin;
1260 bool invalidated;
1261
1262 if (!s->in_use)
1263 continue;
1264
1266 effective_xmin = s->effective_xmin;
1267 effective_catalog_xmin = s->effective_catalog_xmin;
1268 invalidated = s->data.invalidated != RS_INVAL_NONE;
1270
1271 /* invalidated slots need not apply */
1272 if (invalidated)
1273 continue;
1274
1275 /* check the data xmin */
1276 if (TransactionIdIsValid(effective_xmin) &&
1278 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1279 agg_xmin = effective_xmin;
1280
1281 /* check the catalog xmin */
1282 if (TransactionIdIsValid(effective_catalog_xmin) &&
1284 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1285 agg_catalog_xmin = effective_catalog_xmin;
1286 }
1287
1289
1290 if (!already_locked)
1292}
1293
1294/*
1295 * Compute the oldest restart LSN across all slots and inform xlog module.
1296 *
1297 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1298 * purpose, we don't try to account for that, because this module doesn't
1299 * know what to compare against.
1300 */
1301void
1303{
1304 int i;
1306
1308
1311 {
1313 XLogRecPtr restart_lsn;
1314 XLogRecPtr last_saved_restart_lsn;
1315 bool invalidated;
1316 ReplicationSlotPersistency persistency;
1317
1318 if (!s->in_use)
1319 continue;
1320
1322 persistency = s->data.persistency;
1323 restart_lsn = s->data.restart_lsn;
1324 invalidated = s->data.invalidated != RS_INVAL_NONE;
1325 last_saved_restart_lsn = s->last_saved_restart_lsn;
1327
1328 /* invalidated slots need not apply */
1329 if (invalidated)
1330 continue;
1331
1332 /*
1333 * For persistent slot use last_saved_restart_lsn to compute the
1334 * oldest LSN for removal of WAL segments. The segments between
1335 * last_saved_restart_lsn and restart_lsn might be needed by a
1336 * persistent slot in the case of database crash. Non-persistent
1337 * slots can't survive the database crash, so we don't care about
1338 * last_saved_restart_lsn for them.
1339 */
1340 if (persistency == RS_PERSISTENT)
1341 {
1342 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1343 restart_lsn > last_saved_restart_lsn)
1344 {
1345 restart_lsn = last_saved_restart_lsn;
1346 }
1347 }
1348
1349 if (XLogRecPtrIsValid(restart_lsn) &&
1351 restart_lsn < min_required))
1352 min_required = restart_lsn;
1353 }
1355
1357}
1358
1359/*
1360 * Compute the oldest WAL LSN required by *logical* decoding slots..
1361 *
1362 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1363 * slots exist.
1364 *
1365 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1366 * ignores physical replication slots.
1367 *
1368 * The results aren't required frequently, so we don't maintain a precomputed
1369 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1370 */
1373{
1375 int i;
1376
1378 return InvalidXLogRecPtr;
1379
1381
1383 {
1384 ReplicationSlot *s;
1385 XLogRecPtr restart_lsn;
1386 XLogRecPtr last_saved_restart_lsn;
1387 bool invalidated;
1388 ReplicationSlotPersistency persistency;
1389
1391
1392 /* cannot change while ReplicationSlotCtlLock is held */
1393 if (!s->in_use)
1394 continue;
1395
1396 /* we're only interested in logical slots */
1397 if (!SlotIsLogical(s))
1398 continue;
1399
1400 /* read once, it's ok if it increases while we're checking */
1402 persistency = s->data.persistency;
1403 restart_lsn = s->data.restart_lsn;
1404 invalidated = s->data.invalidated != RS_INVAL_NONE;
1405 last_saved_restart_lsn = s->last_saved_restart_lsn;
1407
1408 /* invalidated slots need not apply */
1409 if (invalidated)
1410 continue;
1411
1412 /*
1413 * For persistent slot use last_saved_restart_lsn to compute the
1414 * oldest LSN for removal of WAL segments. The segments between
1415 * last_saved_restart_lsn and restart_lsn might be needed by a
1416 * persistent slot in the case of database crash. Non-persistent
1417 * slots can't survive the database crash, so we don't care about
1418 * last_saved_restart_lsn for them.
1419 */
1420 if (persistency == RS_PERSISTENT)
1421 {
1422 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1423 restart_lsn > last_saved_restart_lsn)
1424 {
1425 restart_lsn = last_saved_restart_lsn;
1426 }
1427 }
1428
1429 if (!XLogRecPtrIsValid(restart_lsn))
1430 continue;
1431
1432 if (!XLogRecPtrIsValid(result) ||
1433 restart_lsn < result)
1434 result = restart_lsn;
1435 }
1436
1438
1439 return result;
1440}
1441
1442/*
1443 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1444 * passed database oid.
1445 *
1446 * Returns true if there are any slots referencing the database. *nslots will
1447 * be set to the absolute number of slots in the database, *nactive to ones
1448 * currently active.
1449 */
1450bool
1452{
1453 int i;
1454
1455 *nslots = *nactive = 0;
1456
1458 return false;
1459
1462 {
1463 ReplicationSlot *s;
1464
1466
1467 /* cannot change while ReplicationSlotCtlLock is held */
1468 if (!s->in_use)
1469 continue;
1470
1471 /* only logical slots are database specific, skip */
1472 if (!SlotIsLogical(s))
1473 continue;
1474
1475 /* not our database, skip */
1476 if (s->data.database != dboid)
1477 continue;
1478
1479 /* NB: intentionally counting invalidated slots */
1480
1481 /* count slots with spinlock held */
1483 (*nslots)++;
1485 (*nactive)++;
1487 }
1489
1490 if (*nslots > 0)
1491 return true;
1492 return false;
1493}
1494
1495/*
1496 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1497 * passed database oid. The caller should hold an exclusive lock on the
1498 * pg_database oid for the database to prevent creation of new slots on the db
1499 * or replay from existing slots.
1500 *
1501 * Another session that concurrently acquires an existing slot on the target DB
1502 * (most likely to drop it) may cause this function to ERROR. If that happens
1503 * it may have dropped some but not all slots.
1504 *
1505 * This routine isn't as efficient as it could be - but we don't drop
1506 * databases often, especially databases with lots of slots.
1507 *
1508 * If the last logical slot in the cluster is dropped, request to disable
1509 * logical decoding.
1510 */
1511void
1513{
1514 int i;
1516 bool dropped = false;
1517
1519 return;
1520
1521restart:
1525 {
1526 ReplicationSlot *s;
1527 char *slotname;
1528 ProcNumber active_proc;
1529
1531
1532 /* cannot change while ReplicationSlotCtlLock is held */
1533 if (!s->in_use)
1534 continue;
1535
1536 /* only logical slots are database specific, skip */
1537 if (!SlotIsLogical(s))
1538 continue;
1539
1540 /*
1541 * Check logical slots on other databases too so we can disable
1542 * logical decoding only if no slots in the cluster.
1543 */
1547
1548 /* not our database, skip */
1549 if (s->data.database != dboid)
1550 continue;
1551
1552 /* NB: intentionally including invalidated slots to drop */
1553
1554 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1556 /* can't change while ReplicationSlotControlLock is held */
1557 slotname = NameStr(s->data.name);
1558 active_proc = s->active_proc;
1559 if (active_proc == INVALID_PROC_NUMBER)
1560 {
1563 }
1565
1566 /*
1567 * Even though we hold an exclusive lock on the database object a
1568 * logical slot for that DB can still be active, e.g. if it's
1569 * concurrently being dropped by a backend connected to another DB.
1570 *
1571 * That's fairly unlikely in practice, so we'll just bail out.
1572 *
1573 * The slot sync worker holds a shared lock on the database before
1574 * operating on synced logical slots to avoid conflict with the drop
1575 * happening here. The persistent synced slots are thus safe but there
1576 * is a possibility that the slot sync worker has created a temporary
1577 * slot (which stays active even on release) and we are trying to drop
1578 * that here. In practice, the chances of hitting this scenario are
1579 * less as during slot synchronization, the temporary slot is
1580 * immediately converted to persistent and thus is safe due to the
1581 * shared lock taken on the database. So, we'll just bail out in such
1582 * a case.
1583 *
1584 * XXX: We can consider shutting down the slot sync worker before
1585 * trying to drop synced temporary slots here.
1586 */
1587 if (active_proc != INVALID_PROC_NUMBER)
1588 ereport(ERROR,
1590 errmsg("replication slot \"%s\" is active for PID %d",
1591 slotname, GetPGProcByNumber(active_proc)->pid)));
1592
1593 /*
1594 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1595 * holding ReplicationSlotControlLock over filesystem operations,
1596 * release ReplicationSlotControlLock and use
1597 * ReplicationSlotDropAcquired.
1598 *
1599 * As that means the set of slots could change, restart scan from the
1600 * beginning each time we release the lock.
1601 */
1604 dropped = true;
1605 goto restart;
1606 }
1608
1609 if (dropped && !found_valid_logicalslot)
1611}
1612
1613/*
1614 * Returns true if there is at least one in-use valid logical replication slot.
1615 */
1616bool
1618{
1619 bool found = false;
1620
1622 return false;
1623
1626 {
1627 ReplicationSlot *s;
1628 bool invalidated;
1629
1631
1632 /* cannot change while ReplicationSlotCtlLock is held */
1633 if (!s->in_use)
1634 continue;
1635
1636 if (SlotIsPhysical(s))
1637 continue;
1638
1640 invalidated = s->data.invalidated != RS_INVAL_NONE;
1642
1643 if (invalidated)
1644 continue;
1645
1646 found = true;
1647 break;
1648 }
1650
1651 return found;
1652}
1653
1654/*
1655 * Check whether the server's configuration supports using replication
1656 * slots.
1657 */
1658void
1660{
1661 /*
1662 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1663 * needs the same check.
1664 */
1665
1666 if (!repack && max_replication_slots == 0)
1667 ereport(ERROR,
1669 errmsg("replication slots can only be used if \"%s\" > 0",
1670 "max_replication_slots"));
1671
1673 ereport(ERROR,
1675 errmsg("REPACK can only be used if \"%s\" > 0",
1676 "max_repack_replication_slots"));
1677
1679 ereport(ERROR,
1681 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1682}
1683
1684/*
1685 * Check whether the user has privilege to use replication slots.
1686 */
1687void
1689{
1691 ereport(ERROR,
1693 errmsg("permission denied to use replication slots"),
1694 errdetail("Only roles with the %s attribute may use replication slots.",
1695 "REPLICATION")));
1696}
1697
1698/*
1699 * Reserve WAL for the currently active slot.
1700 *
1701 * Compute and set restart_lsn in a manner that's appropriate for the type of
1702 * the slot and concurrency safe.
1703 */
1704void
1706{
1708 XLogSegNo segno;
1709 XLogRecPtr restart_lsn;
1710
1711 Assert(slot != NULL);
1714
1715 /*
1716 * The replication slot mechanism is used to prevent the removal of
1717 * required WAL.
1718 *
1719 * Acquire an exclusive lock to prevent the checkpoint process from
1720 * concurrently computing the minimum slot LSN (see
1721 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1722 * replication cannot be removed during a checkpoint.
1723 *
1724 * The mechanism is reliable because if WAL reservation occurs first, the
1725 * checkpoint must wait for the restart_lsn update before determining the
1726 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1727 * first, subsequent WAL reservations will select positions at or beyond
1728 * the redo pointer of that checkpoint.
1729 */
1731
1732 /*
1733 * For logical slots log a standby snapshot and start logical decoding at
1734 * exactly that position. That allows the slot to start up more quickly.
1735 * But on a standby we cannot do WAL writes, so just use the replay
1736 * pointer; effectively, an attempt to create a logical slot on standby
1737 * will cause it to wait for an xl_running_xact record to be logged
1738 * independently on the primary, so that a snapshot can be built using the
1739 * record.
1740 *
1741 * None of this is needed (or indeed helpful) for physical slots as
1742 * they'll start replay at the last logged checkpoint anyway. Instead,
1743 * return the location of the last redo LSN, where a base backup has to
1744 * start replay at.
1745 */
1746 if (SlotIsPhysical(slot))
1747 restart_lsn = GetRedoRecPtr();
1748 else if (RecoveryInProgress())
1749 restart_lsn = GetXLogReplayRecPtr(NULL);
1750 else
1751 restart_lsn = GetXLogInsertRecPtr();
1752
1753 SpinLockAcquire(&slot->mutex);
1754 slot->data.restart_lsn = restart_lsn;
1755 SpinLockRelease(&slot->mutex);
1756
1757 /* prevent WAL removal as fast as possible */
1759
1760 /* Checkpoint shouldn't remove the required WAL. */
1762 if (XLogGetLastRemovedSegno() >= segno)
1763 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1764 NameStr(slot->data.name));
1765
1767
1768 if (!RecoveryInProgress() && SlotIsLogical(slot))
1769 {
1771
1772 /* make sure we have enough information to start */
1774
1775 /* and make sure it's fsynced to disk */
1777 }
1778}
1779
1780/*
1781 * Report that replication slot needs to be invalidated
1782 */
1783static void
1785 bool terminating,
1786 int pid,
1787 NameData slotname,
1788 XLogRecPtr restart_lsn,
1790 TransactionId snapshotConflictHorizon,
1791 long slot_idle_seconds)
1792{
1795
1798
1799 switch (cause)
1800 {
1802 {
1803 uint64 ex = oldestLSN - restart_lsn;
1804
1806 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1807 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1808 ex),
1809 LSN_FORMAT_ARGS(restart_lsn),
1810 ex);
1811 /* translator: %s is a GUC variable name */
1812 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1813 "max_slot_wal_keep_size");
1814 break;
1815 }
1816 case RS_INVAL_HORIZON:
1817 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1818 snapshotConflictHorizon);
1819 break;
1820
1821 case RS_INVAL_WAL_LEVEL:
1822 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1823 break;
1824
1826 {
1827 /* translator: %s is a GUC variable name */
1828 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1829 slot_idle_seconds, "idle_replication_slot_timeout",
1831 /* translator: %s is a GUC variable name */
1832 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1833 "idle_replication_slot_timeout");
1834 break;
1835 }
1836 case RS_INVAL_NONE:
1838 }
1839
1840 ereport(LOG,
1841 terminating ?
1842 errmsg("terminating process %d to release replication slot \"%s\"",
1843 pid, NameStr(slotname)) :
1844 errmsg("invalidating obsolete replication slot \"%s\"",
1845 NameStr(slotname)),
1846 errdetail_internal("%s", err_detail.data),
1847 err_hint.len ? errhint("%s", err_hint.data) : 0);
1848
1849 pfree(err_detail.data);
1850 pfree(err_hint.data);
1851}
1852
1853/*
1854 * Can we invalidate an idle replication slot?
1855 *
1856 * Idle timeout invalidation is allowed only when:
1857 *
1858 * 1. Idle timeout is set
1859 * 2. Slot has reserved WAL
1860 * 3. Slot is inactive
1861 * 4. The slot is not being synced from the primary while the server is in
1862 * recovery. This is because synced slots are always considered to be
1863 * inactive because they don't perform logical decoding to produce changes.
1864 */
1865static inline bool
1873
1874/*
1875 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1876 * becomes invalid among the given possible causes.
1877 *
1878 * This function sequentially checks all possible invalidation causes and
1879 * returns the first one for which the slot is eligible for invalidation.
1880 */
1883 XLogRecPtr oldestLSN, Oid dboid,
1884 TransactionId snapshotConflictHorizon,
1885 TimestampTz *inactive_since, TimestampTz now)
1886{
1888
1890 {
1891 XLogRecPtr restart_lsn = s->data.restart_lsn;
1892
1893 if (XLogRecPtrIsValid(restart_lsn) &&
1894 restart_lsn < oldestLSN)
1895 return RS_INVAL_WAL_REMOVED;
1896 }
1897
1899 {
1900 /* invalid DB oid signals a shared relation */
1901 if (SlotIsLogical(s) &&
1902 (dboid == InvalidOid || dboid == s->data.database))
1903 {
1904 TransactionId effective_xmin = s->effective_xmin;
1906
1907 if (TransactionIdIsValid(effective_xmin) &&
1908 TransactionIdPrecedesOrEquals(effective_xmin,
1909 snapshotConflictHorizon))
1910 return RS_INVAL_HORIZON;
1913 snapshotConflictHorizon))
1914 return RS_INVAL_HORIZON;
1915 }
1916 }
1917
1919 {
1920 if (SlotIsLogical(s))
1921 return RS_INVAL_WAL_LEVEL;
1922 }
1923
1925 {
1926 Assert(now > 0);
1927
1928 if (CanInvalidateIdleSlot(s))
1929 {
1930 /*
1931 * Simulate the invalidation due to idle_timeout to test the
1932 * timeout behavior promptly, without waiting for it to trigger
1933 * naturally.
1934 */
1935#ifdef USE_INJECTION_POINTS
1936 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1937 {
1938 *inactive_since = 0; /* since the beginning of time */
1939 return RS_INVAL_IDLE_TIMEOUT;
1940 }
1941#endif
1942
1943 /*
1944 * Check if the slot needs to be invalidated due to
1945 * idle_replication_slot_timeout GUC.
1946 */
1949 {
1950 *inactive_since = s->inactive_since;
1951 return RS_INVAL_IDLE_TIMEOUT;
1952 }
1953 }
1954 }
1955
1956 return RS_INVAL_NONE;
1957}
1958
1959/*
1960 * Helper for InvalidateObsoleteReplicationSlots
1961 *
1962 * Acquires the given slot and mark it invalid, if necessary and possible.
1963 *
1964 * Returns true if the slot was invalidated.
1965 *
1966 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1967 * interim (and in that case we're not holding the lock at return, otherwise
1968 * we are).
1969 *
1970 * This is inherently racy, because we release the LWLock
1971 * for syscalls, so caller must restart if we return true.
1972 */
1973static bool
1975 ReplicationSlot *s,
1977 Oid dboid, TransactionId snapshotConflictHorizon,
1978 bool *released_lock_out)
1979{
1980 int last_signaled_pid = 0;
1981 bool released_lock = false;
1982 bool invalidated = false;
1983 TimestampTz inactive_since = 0;
1984
1985 for (;;)
1986 {
1987 XLogRecPtr restart_lsn;
1988 NameData slotname;
1989 ProcNumber active_proc;
1990 int active_pid = 0;
1992 TimestampTz now = 0;
1993 long slot_idle_secs = 0;
1994
1996
1997 if (!s->in_use)
1998 {
1999 if (released_lock)
2001 break;
2002 }
2003
2005 {
2006 /*
2007 * Assign the current time here to avoid system call overhead
2008 * while holding the spinlock in subsequent code.
2009 */
2011 }
2012
2013 /*
2014 * Check if the slot needs to be invalidated. If it needs to be
2015 * invalidated, and is not currently acquired, acquire it and mark it
2016 * as having been invalidated. We do this with the spinlock held to
2017 * avoid race conditions -- for example the restart_lsn could move
2018 * forward, or the slot could be dropped.
2019 */
2021
2022 restart_lsn = s->data.restart_lsn;
2023
2024 /* we do nothing if the slot is already invalid */
2025 if (s->data.invalidated == RS_INVAL_NONE)
2027 s, oldestLSN,
2028 dboid,
2029 snapshotConflictHorizon,
2030 &inactive_since,
2031 now);
2032
2033 /* if there's no invalidation, we're done */
2035 {
2037 if (released_lock)
2039 break;
2040 }
2041
2042 slotname = s->data.name;
2043 active_proc = s->active_proc;
2044
2045 /*
2046 * If the slot can be acquired, do so and mark it invalidated
2047 * immediately. Otherwise we'll signal the owning process, below, and
2048 * retry.
2049 *
2050 * Note: Unlike other slot attributes, slot's inactive_since can't be
2051 * changed until the acquired slot is released or the owning process
2052 * is terminated. So, the inactive slot can only be invalidated
2053 * immediately without being terminated.
2054 */
2055 if (active_proc == INVALID_PROC_NUMBER)
2056 {
2060
2061 /*
2062 * XXX: We should consider not overwriting restart_lsn and instead
2063 * just rely on .invalidated.
2064 */
2066 {
2069 }
2070
2071 /* Let caller know */
2072 invalidated = true;
2073 }
2074 else
2075 {
2076 active_pid = GetPGProcByNumber(active_proc)->pid;
2077 Assert(active_pid != 0);
2078 }
2079
2081
2082 /*
2083 * Calculate the idle time duration of the slot if slot is marked
2084 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2085 */
2087 {
2088 int slot_idle_usecs;
2089
2090 TimestampDifference(inactive_since, now, &slot_idle_secs,
2092 }
2093
2094 if (active_proc != INVALID_PROC_NUMBER)
2095 {
2096 /*
2097 * Prepare the sleep on the slot's condition variable before
2098 * releasing the lock, to close a possible race condition if the
2099 * slot is released before the sleep below.
2100 */
2102
2104 released_lock = true;
2105
2106 /*
2107 * Signal to terminate the process that owns the slot, if we
2108 * haven't already signalled it. (Avoidance of repeated
2109 * signalling is the only reason for there to be a loop in this
2110 * routine; otherwise we could rely on caller's restart loop.)
2111 *
2112 * There is the race condition that other process may own the slot
2113 * after its current owner process is terminated and before this
2114 * process owns it. To handle that, we signal only if the PID of
2115 * the owning process has changed from the previous time. (This
2116 * logic assumes that the same PID is not reused very quickly.)
2117 */
2119 {
2121 slotname, restart_lsn,
2122 oldestLSN, snapshotConflictHorizon,
2124
2125 if (MyBackendType == B_STARTUP)
2127 active_pid,
2129 else
2130 (void) kill(active_pid, SIGTERM);
2131
2133 }
2134
2135 /* Wait until the slot is released. */
2138
2139 /*
2140 * Re-acquire lock and start over; we expect to invalidate the
2141 * slot next time (unless another process acquires the slot in the
2142 * meantime).
2143 *
2144 * Note: It is possible for a slot to advance its restart_lsn or
2145 * xmin values sufficiently between when we release the mutex and
2146 * when we recheck, moving from a conflicting state to a non
2147 * conflicting state. This is intentional and safe: if the slot
2148 * has caught up while we're busy here, the resources we were
2149 * concerned about (WAL segments or tuples) have not yet been
2150 * removed, and there's no reason to invalidate the slot.
2151 */
2153 continue;
2154 }
2155 else
2156 {
2157 /*
2158 * We hold the slot now and have already invalidated it; flush it
2159 * to ensure that state persists.
2160 *
2161 * Don't want to hold ReplicationSlotControlLock across file
2162 * system operations, so release it now but be sure to tell caller
2163 * to restart from scratch.
2164 */
2166 released_lock = true;
2167
2168 /* Make sure the invalidated state persists across server restart */
2172
2174 slotname, restart_lsn,
2175 oldestLSN, snapshotConflictHorizon,
2177
2178 /* done with this slot for now */
2179 break;
2180 }
2181 }
2182
2184
2186 return invalidated;
2187}
2188
2189/*
2190 * Invalidate slots that require resources about to be removed.
2191 *
2192 * Returns true when any slot have got invalidated.
2193 *
2194 * Whether a slot needs to be invalidated depends on the invalidation cause.
2195 * A slot is invalidated if it:
2196 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2197 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2198 * db; dboid may be InvalidOid for shared relations
2199 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2200 * logical.
2201 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2202 * "idle_replication_slot_timeout" duration.
2203 *
2204 * Note: This function attempts to invalidate the slot for multiple possible
2205 * causes in a single pass, minimizing redundant iterations. The "cause"
2206 * parameter can be a MASK representing one or more of the defined causes.
2207 *
2208 * If it invalidates the last logical slot in the cluster, it requests to
2209 * disable logical decoding.
2210 *
2211 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2212 */
2213bool
2215 XLogSegNo oldestSegno, Oid dboid,
2216 TransactionId snapshotConflictHorizon)
2217{
2219 bool invalidated = false;
2220 bool invalidated_logical = false;
2222
2223 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2226
2228 return invalidated;
2229
2231
2232restart:
2236 {
2238 bool released_lock = false;
2239
2240 if (!s->in_use)
2241 continue;
2242
2243 /* Prevent invalidation of logical slots during binary upgrade */
2245 {
2249
2250 continue;
2251 }
2252
2254 dboid, snapshotConflictHorizon,
2255 &released_lock))
2256 {
2258
2259 /* Remember we have invalidated a physical or logical slot */
2260 invalidated = true;
2261
2262 /*
2263 * Additionally, remember we have invalidated a logical slot as we
2264 * can request disabling logical decoding later.
2265 */
2266 if (SlotIsLogical(s))
2267 invalidated_logical = true;
2268 }
2269 else
2270 {
2271 /*
2272 * We need to check if the slot is invalidated here since
2273 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2274 * is already invalidated.
2275 */
2280 }
2281
2282 /* if the lock was released, start from scratch */
2283 if (released_lock)
2284 goto restart;
2285 }
2287
2288 /*
2289 * If any slots have been invalidated, recalculate the resource limits.
2290 */
2291 if (invalidated)
2292 {
2295 }
2296
2297 /*
2298 * Request the checkpointer to disable logical decoding if no valid
2299 * logical slots remain. If called by the checkpointer during a
2300 * checkpoint, only the request is initiated; actual deactivation is
2301 * deferred until after the checkpoint completes.
2302 */
2305
2306 return invalidated;
2307}
2308
2309/*
2310 * Flush all replication slots to disk.
2311 *
2312 * It is convenient to flush dirty replication slots at the time of checkpoint.
2313 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2314 * for which the confirmed_flush LSN has been updated since the last time it
2315 * was saved and flush them.
2316 */
2317void
2319{
2320 int i;
2321 bool last_saved_restart_lsn_updated = false;
2322
2323 elog(DEBUG1, "performing replication slot checkpoint");
2324
2325 /*
2326 * Prevent any slot from being created/dropped while we're active. As we
2327 * explicitly do *not* want to block iterating over replication_slots or
2328 * acquiring a slot we cannot take the control lock - but that's OK,
2329 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2330 * enough to guarantee that nobody can change the in_use bits on us.
2331 *
2332 * Additionally, acquiring the Allocation lock is necessary to serialize
2333 * the slot flush process with concurrent slot WAL reservation. This
2334 * ensures that the WAL position being reserved is either flushed to disk
2335 * or is beyond or equal to the redo pointer of the current checkpoint
2336 * (See ReplicationSlotReserveWal for details).
2337 */
2339
2341 {
2343 char path[MAXPGPATH];
2344
2345 if (!s->in_use)
2346 continue;
2347
2348 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2349 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2350
2351 /*
2352 * Slot's data is not flushed each time the confirmed_flush LSN is
2353 * updated as that could lead to frequent writes. However, we decide
2354 * to force a flush of all logical slot's data at the time of shutdown
2355 * if the confirmed_flush LSN is changed since we last flushed it to
2356 * disk. This helps in avoiding an unnecessary retreat of the
2357 * confirmed_flush LSN after restart.
2358 */
2359 if (is_shutdown && SlotIsLogical(s))
2360 {
2362
2363 if (s->data.invalidated == RS_INVAL_NONE &&
2365 {
2366 s->just_dirtied = true;
2367 s->dirty = true;
2368 }
2370 }
2371
2372 /*
2373 * Track if we're going to update slot's last_saved_restart_lsn. We
2374 * need this to know if we need to recompute the required LSN.
2375 */
2378
2379 SaveSlotToPath(s, path, LOG);
2380 }
2382
2383 /*
2384 * Recompute the required LSN if SaveSlotToPath() updated
2385 * last_saved_restart_lsn for any slot.
2386 */
2389}
2390
2391/*
2392 * Load all replication slots from disk into memory at server startup. This
2393 * needs to be run before we start crash recovery.
2394 */
2395void
2397{
2399 struct dirent *replication_de;
2400
2401 elog(DEBUG1, "starting up replication slots");
2402
2403 /* restore all slots by iterating over all on-disk entries */
2406 {
2407 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2409
2410 if (strcmp(replication_de->d_name, ".") == 0 ||
2411 strcmp(replication_de->d_name, "..") == 0)
2412 continue;
2413
2414 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2416
2417 /* we're only creating directories here, skip if it's not our's */
2419 continue;
2420
2421 /* we crashed while a slot was being setup or deleted, clean up */
2422 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2423 {
2424 if (!rmtree(path, true))
2425 {
2427 (errmsg("could not remove directory \"%s\"",
2428 path)));
2429 continue;
2430 }
2432 continue;
2433 }
2434
2435 /* looks like a slot in a normal state, restore */
2437 }
2439
2440 /* currently no slots exist, we're done. */
2442 return;
2443
2444 /* Now that we have recovered all the data, compute replication xmin */
2447}
2448
2449/* ----
2450 * Manipulation of on-disk state of replication slots
2451 *
2452 * NB: none of the routines below should take any notice whether a slot is the
2453 * current one or not, that's all handled a layer above.
2454 * ----
2455 */
2456static void
2458{
2459 char tmppath[MAXPGPATH];
2460 char path[MAXPGPATH];
2461 struct stat st;
2462
2463 /*
2464 * No need to take out the io_in_progress_lock, nobody else can see this
2465 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2466 * takes out the lock, if we'd take the lock here, we'd deadlock.
2467 */
2468
2469 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2470 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2471
2472 /*
2473 * It's just barely possible that some previous effort to create or drop a
2474 * slot with this name left a temp directory lying around. If that seems
2475 * to be the case, try to remove it. If the rmtree() fails, we'll error
2476 * out at the MakePGDirectory() below, so we don't bother checking
2477 * success.
2478 */
2479 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2480 rmtree(tmppath, true);
2481
2482 /* Create and fsync the temporary slot directory. */
2483 if (MakePGDirectory(tmppath) < 0)
2484 ereport(ERROR,
2486 errmsg("could not create directory \"%s\": %m",
2487 tmppath)));
2488 fsync_fname(tmppath, true);
2489
2490 /* Write the actual state file. */
2491 slot->dirty = true; /* signal that we really need to write */
2493
2494 /* Rename the directory into place. */
2495 if (rename(tmppath, path) != 0)
2496 ereport(ERROR,
2498 errmsg("could not rename file \"%s\" to \"%s\": %m",
2499 tmppath, path)));
2500
2501 /*
2502 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2503 * would persist after an OS crash or not - so, force a restart. The
2504 * restart would try to fsync this again till it works.
2505 */
2507
2508 fsync_fname(path, true);
2510
2512}
2513
2514/*
2515 * Shared functionality between saving and creating a replication slot.
2516 */
2517static void
2518SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2519{
2520 char tmppath[MAXPGPATH];
2521 char path[MAXPGPATH];
2522 int fd;
2524 bool was_dirty;
2525
2526 /* first check whether there's something to write out */
2527 SpinLockAcquire(&slot->mutex);
2528 was_dirty = slot->dirty;
2529 slot->just_dirtied = false;
2530 SpinLockRelease(&slot->mutex);
2531
2532 /* and don't do anything if there's nothing to write */
2533 if (!was_dirty)
2534 return;
2535
2537
2538 /* silence valgrind :( */
2539 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2540
2541 sprintf(tmppath, "%s/state.tmp", dir);
2542 sprintf(path, "%s/state", dir);
2543
2545 if (fd < 0)
2546 {
2547 /*
2548 * If not an ERROR, then release the lock before returning. In case
2549 * of an ERROR, the error recovery path automatically releases the
2550 * lock, but no harm in explicitly releasing even in that case. Note
2551 * that LWLockRelease() could affect errno.
2552 */
2553 int save_errno = errno;
2554
2556 errno = save_errno;
2557 ereport(elevel,
2559 errmsg("could not create file \"%s\": %m",
2560 tmppath)));
2561 return;
2562 }
2563
2564 cp.magic = SLOT_MAGIC;
2565 INIT_CRC32C(cp.checksum);
2566 cp.version = SLOT_VERSION;
2568
2569 SpinLockAcquire(&slot->mutex);
2570
2571 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2572
2573 SpinLockRelease(&slot->mutex);
2574
2575 COMP_CRC32C(cp.checksum,
2578 FIN_CRC32C(cp.checksum);
2579
2580 errno = 0;
2582 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2583 {
2584 int save_errno = errno;
2585
2588 unlink(tmppath);
2590
2591 /* if write didn't set errno, assume problem is no disk space */
2593 ereport(elevel,
2595 errmsg("could not write to file \"%s\": %m",
2596 tmppath)));
2597 return;
2598 }
2600
2601 /* fsync the temporary file */
2603 if (pg_fsync(fd) != 0)
2604 {
2605 int save_errno = errno;
2606
2609 unlink(tmppath);
2611
2612 errno = save_errno;
2613 ereport(elevel,
2615 errmsg("could not fsync file \"%s\": %m",
2616 tmppath)));
2617 return;
2618 }
2620
2621 if (CloseTransientFile(fd) != 0)
2622 {
2623 int save_errno = errno;
2624
2625 unlink(tmppath);
2627
2628 errno = save_errno;
2629 ereport(elevel,
2631 errmsg("could not close file \"%s\": %m",
2632 tmppath)));
2633 return;
2634 }
2635
2636 /* rename to permanent file, fsync file and directory */
2637 if (rename(tmppath, path) != 0)
2638 {
2639 int save_errno = errno;
2640
2641 unlink(tmppath);
2643
2644 errno = save_errno;
2645 ereport(elevel,
2647 errmsg("could not rename file \"%s\" to \"%s\": %m",
2648 tmppath, path)));
2649 return;
2650 }
2651
2652 /*
2653 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2654 */
2656
2657 fsync_fname(path, false);
2658 fsync_fname(dir, true);
2660
2662
2663 /*
2664 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2665 * already and remember the confirmed_flush LSN value.
2666 */
2667 SpinLockAcquire(&slot->mutex);
2668 if (!slot->just_dirtied)
2669 slot->dirty = false;
2670 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2671 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2672 SpinLockRelease(&slot->mutex);
2673
2675}
2676
2677/*
2678 * Load a single slot from disk into memory.
2679 */
2680static void
2682{
2684 int i;
2685 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2686 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2687 int fd;
2688 bool restored = false;
2689 int readBytes;
2690 pg_crc32c checksum;
2691 TimestampTz now = 0;
2692
2693 /* no need to lock here, no concurrent access allowed yet */
2694
2695 /* delete temp file if it exists */
2696 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2697 sprintf(path, "%s/state.tmp", slotdir);
2698 if (unlink(path) < 0 && errno != ENOENT)
2699 ereport(PANIC,
2701 errmsg("could not remove file \"%s\": %m", path)));
2702
2703 sprintf(path, "%s/state", slotdir);
2704
2705 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2706
2707 /* on some operating systems fsyncing a file requires O_RDWR */
2709
2710 /*
2711 * We do not need to handle this as we are rename()ing the directory into
2712 * place only after we fsync()ed the state file.
2713 */
2714 if (fd < 0)
2715 ereport(PANIC,
2717 errmsg("could not open file \"%s\": %m", path)));
2718
2719 /*
2720 * Sync state file before we're reading from it. We might have crashed
2721 * while it wasn't synced yet and we shouldn't continue on that basis.
2722 */
2724 if (pg_fsync(fd) != 0)
2725 ereport(PANIC,
2727 errmsg("could not fsync file \"%s\": %m",
2728 path)));
2730
2731 /* Also sync the parent directory */
2733 fsync_fname(slotdir, true);
2735
2736 /* read part of statefile that's guaranteed to be version independent */
2741 {
2742 if (readBytes < 0)
2743 ereport(PANIC,
2745 errmsg("could not read file \"%s\": %m", path)));
2746 else
2747 ereport(PANIC,
2749 errmsg("could not read file \"%s\": read %d of %zu",
2750 path, readBytes,
2752 }
2753
2754 /* verify magic */
2755 if (cp.magic != SLOT_MAGIC)
2756 ereport(PANIC,
2758 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2759 path, cp.magic, SLOT_MAGIC)));
2760
2761 /* verify version */
2762 if (cp.version != SLOT_VERSION)
2763 ereport(PANIC,
2765 errmsg("replication slot file \"%s\" has unsupported version %u",
2766 path, cp.version)));
2767
2768 /* boundary check on length */
2769 if (cp.length != ReplicationSlotOnDiskV2Size)
2770 ereport(PANIC,
2772 errmsg("replication slot file \"%s\" has corrupted length %u",
2773 path, cp.length)));
2774
2775 /* Now that we know the size, read the entire file */
2777 readBytes = read(fd,
2779 cp.length);
2781 if (readBytes != cp.length)
2782 {
2783 if (readBytes < 0)
2784 ereport(PANIC,
2786 errmsg("could not read file \"%s\": %m", path)));
2787 else
2788 ereport(PANIC,
2790 errmsg("could not read file \"%s\": read %d of %zu",
2791 path, readBytes, (Size) cp.length)));
2792 }
2793
2794 if (CloseTransientFile(fd) != 0)
2795 ereport(PANIC,
2797 errmsg("could not close file \"%s\": %m", path)));
2798
2799 /* now verify the CRC */
2800 INIT_CRC32C(checksum);
2801 COMP_CRC32C(checksum,
2804 FIN_CRC32C(checksum);
2805
2806 if (!EQ_CRC32C(checksum, cp.checksum))
2807 ereport(PANIC,
2808 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2809 path, checksum, cp.checksum)));
2810
2811 /*
2812 * If we crashed with an ephemeral slot active, don't restore but delete
2813 * it.
2814 */
2815 if (cp.slotdata.persistency != RS_PERSISTENT)
2816 {
2817 if (!rmtree(slotdir, true))
2818 {
2820 (errmsg("could not remove directory \"%s\"",
2821 slotdir)));
2822 }
2824 return;
2825 }
2826
2827 /*
2828 * Verify that requirements for the specific slot type are met. That's
2829 * important because if these aren't met we're not guaranteed to retain
2830 * all the necessary resources for the slot.
2831 *
2832 * NB: We have to do so *after* the above checks for ephemeral slots,
2833 * because otherwise a slot that shouldn't exist anymore could prevent
2834 * restarts.
2835 *
2836 * NB: Changing the requirements here also requires adapting
2837 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2838 */
2839 if (cp.slotdata.database != InvalidOid)
2840 {
2842 ereport(FATAL,
2844 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2845 NameStr(cp.slotdata.name)),
2846 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2847
2848 /*
2849 * In standby mode, the hot standby must be enabled. This check is
2850 * necessary to ensure logical slots are invalidated when they become
2851 * incompatible due to insufficient wal_level. Otherwise, if the
2852 * primary reduces effective_wal_level < logical while hot standby is
2853 * disabled, primary disable logical decoding while hot standby is
2854 * disabled, logical slots would remain valid even after promotion.
2855 */
2857 ereport(FATAL,
2859 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2860 NameStr(cp.slotdata.name)),
2861 errhint("Change \"hot_standby\" to be \"on\".")));
2862 }
2863 else if (wal_level < WAL_LEVEL_REPLICA)
2864 ereport(FATAL,
2866 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2867 NameStr(cp.slotdata.name)),
2868 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2869
2870 /*
2871 * Nothing can be active yet, don't lock anything. Note we iterate up to
2872 * max_replication_slots instead of adding max_repack_replication_slots as
2873 * in all other places, because we must enforce the GUC value in case
2874 * there were more slots before the shutdown than what it is set up to
2875 * now.
2876 */
2877 for (i = 0; i < max_replication_slots; i++)
2878 {
2879 ReplicationSlot *slot;
2880
2882
2883 if (slot->in_use)
2884 continue;
2885
2886 /* restore the entire set of persistent data */
2887 memcpy(&slot->data, &cp.slotdata,
2889
2890 /* initialize in memory state */
2891 slot->effective_xmin = cp.slotdata.xmin;
2892 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2893 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2894 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2895
2900
2901 slot->in_use = true;
2903
2904 /*
2905 * Set the time since the slot has become inactive after loading the
2906 * slot from the disk into memory. Whoever acquires the slot i.e.
2907 * makes the slot active will reset it. Use the same inactive_since
2908 * time for all the slots.
2909 */
2910 if (now == 0)
2912
2914
2915 restored = true;
2916 break;
2917 }
2918
2919 if (!restored)
2920 ereport(FATAL,
2921 (errmsg("too many replication slots active before shutdown"),
2922 errhint("Increase \"max_replication_slots\" and try again.")));
2923}
2924
2925/*
2926 * Maps an invalidation reason for a replication slot to
2927 * ReplicationSlotInvalidationCause.
2928 */
2930GetSlotInvalidationCause(const char *cause_name)
2931{
2932 Assert(cause_name);
2933
2934 /* Search lookup table for the cause having this name */
2935 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2936 {
2937 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2939 }
2940
2941 Assert(false);
2942 return RS_INVAL_NONE; /* to keep compiler quiet */
2943}
2944
2945/*
2946 * Maps a ReplicationSlotInvalidationCause to the invalidation
2947 * reason for a replication slot.
2948 */
2949const char *
2951{
2952 /* Search lookup table for the name of this cause */
2953 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2954 {
2955 if (SlotInvalidationCauses[i].cause == cause)
2957 }
2958
2959 Assert(false);
2960 return "none"; /* to keep compiler quiet */
2961}
2962
2963/*
2964 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2965 *
2966 * The rawname will be parsed, and the result will be saved into *elemlist.
2967 */
2968static bool
2970{
2971 /* Verify syntax and parse string into a list of identifiers */
2973 {
2974 GUC_check_errdetail("List syntax is invalid.");
2975 return false;
2976 }
2977
2978 /* Iterate the list to validate each slot name */
2979 foreach_ptr(char, name, *elemlist)
2980 {
2981 int err_code;
2982 char *err_msg = NULL;
2983 char *err_hint = NULL;
2984
2986 &err_msg, &err_hint))
2987 {
2989 GUC_check_errdetail("%s", err_msg);
2990 if (err_hint != NULL)
2992 return false;
2993 }
2994 }
2995
2996 return true;
2997}
2998
2999/*
3000 * GUC check_hook for synchronized_standby_slots
3001 */
3002bool
3004{
3005 char *rawname;
3006 char *ptr;
3007 List *elemlist;
3008 int size;
3009 bool ok;
3011
3012 if ((*newval)[0] == '\0')
3013 return true;
3014
3015 /* Need a modifiable copy of the GUC string */
3017
3018 /* Now verify if the specified slots exist and have correct type */
3020
3021 if (!ok || elemlist == NIL)
3022 {
3023 pfree(rawname);
3025 return ok;
3026 }
3027
3028 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3029 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3030 foreach_ptr(char, slot_name, elemlist)
3031 size += strlen(slot_name) + 1;
3032
3033 /* GUC extra value must be guc_malloc'd, not palloc'd */
3034 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3035 if (!config)
3036 return false;
3037
3038 /* Transform the data into SyncStandbySlotsConfigData */
3039 config->nslotnames = list_length(elemlist);
3040
3041 ptr = config->slot_names;
3042 foreach_ptr(char, slot_name, elemlist)
3043 {
3044 strcpy(ptr, slot_name);
3045 ptr += strlen(slot_name) + 1;
3046 }
3047
3048 *extra = config;
3049
3050 pfree(rawname);
3052 return true;
3053}
3054
3055/*
3056 * GUC assign_hook for synchronized_standby_slots
3057 */
3058void
3060{
3061 /*
3062 * The standby slots may have changed, so we must recompute the oldest
3063 * LSN.
3064 */
3066
3068}
3069
3070/*
3071 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3072 */
3073bool
3074SlotExistsInSyncStandbySlots(const char *slot_name)
3075{
3076 const char *standby_slot_name;
3077
3078 /* Return false if there is no value in synchronized_standby_slots */
3080 return false;
3081
3082 /*
3083 * XXX: We are not expecting this list to be long so a linear search
3084 * shouldn't hurt but if that turns out not to be true then we can cache
3085 * this information for each WalSender as well.
3086 */
3088 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3089 {
3090 if (strcmp(standby_slot_name, slot_name) == 0)
3091 return true;
3092
3094 }
3095
3096 return false;
3097}
3098
3099/*
3100 * Return true if the slots specified in synchronized_standby_slots have caught up to
3101 * the given WAL location, false otherwise.
3102 *
3103 * The elevel parameter specifies the error level used for logging messages
3104 * related to slots that do not exist, are invalidated, or are inactive.
3105 */
3106bool
3108{
3109 const char *name;
3110 int caught_up_slot_num = 0;
3112
3113 /*
3114 * Don't need to wait for the standbys to catch up if there is no value in
3115 * synchronized_standby_slots.
3116 */
3118 return true;
3119
3120 /*
3121 * Don't need to wait for the standbys to catch up if we are on a standby
3122 * server, since we do not support syncing slots to cascading standbys.
3123 */
3124 if (RecoveryInProgress())
3125 return true;
3126
3127 /*
3128 * Don't need to wait for the standbys to catch up if they are already
3129 * beyond the specified WAL location.
3130 */
3133 return true;
3134
3135 /*
3136 * To prevent concurrent slot dropping and creation while filtering the
3137 * slots, take the ReplicationSlotControlLock outside of the loop.
3138 */
3140
3142 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3143 {
3144 XLogRecPtr restart_lsn;
3145 bool invalidated;
3146 bool inactive;
3147 ReplicationSlot *slot;
3148
3149 slot = SearchNamedReplicationSlot(name, false);
3150
3151 /*
3152 * If a slot name provided in synchronized_standby_slots does not
3153 * exist, report a message and exit the loop.
3154 */
3155 if (!slot)
3156 {
3157 ereport(elevel,
3159 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3160 name, "synchronized_standby_slots"),
3161 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3162 name),
3163 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3164 name, "synchronized_standby_slots"));
3165 break;
3166 }
3167
3168 /* Same as above: if a slot is not physical, exit the loop. */
3169 if (SlotIsLogical(slot))
3170 {
3171 ereport(elevel,
3173 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3174 name, "synchronized_standby_slots"),
3175 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3176 name),
3177 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3178 name, "synchronized_standby_slots"));
3179 break;
3180 }
3181
3182 SpinLockAcquire(&slot->mutex);
3183 restart_lsn = slot->data.restart_lsn;
3184 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3186 SpinLockRelease(&slot->mutex);
3187
3188 if (invalidated)
3189 {
3190 /* Specified physical slot has been invalidated */
3191 ereport(elevel,
3193 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3194 name, "synchronized_standby_slots"),
3195 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3196 name),
3197 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3198 name, "synchronized_standby_slots"));
3199 break;
3200 }
3201
3202 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3203 {
3204 /* Log a message if no active_pid for this physical slot */
3205 if (inactive)
3206 ereport(elevel,
3208 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3209 name, "synchronized_standby_slots"),
3210 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3211 name),
3212 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3213 name, "synchronized_standby_slots"));
3214
3215 /* Continue if the current slot hasn't caught up. */
3216 break;
3217 }
3218
3219 Assert(restart_lsn >= wait_for_lsn);
3220
3222 min_restart_lsn > restart_lsn)
3223 min_restart_lsn = restart_lsn;
3224
3226
3227 name += strlen(name) + 1;
3228 }
3229
3231
3232 /*
3233 * Return false if not all the standbys have caught up to the specified
3234 * WAL location.
3235 */
3237 return false;
3238
3239 /* The ss_oldest_flush_lsn must not retreat. */
3242
3244
3245 return true;
3246}
3247
3248/*
3249 * Wait for physical standbys to confirm receiving the given lsn.
3250 *
3251 * Used by logical decoding SQL functions. It waits for physical standbys
3252 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3253 */
3254void
3256{
3257 /*
3258 * Don't need to wait for the standby to catch up if the current acquired
3259 * slot is not a logical failover slot, or there is no value in
3260 * synchronized_standby_slots.
3261 */
3263 return;
3264
3266
3267 for (;;)
3268 {
3270
3272 {
3273 ConfigReloadPending = false;
3275 }
3276
3277 /* Exit if done waiting for every slot. */
3279 break;
3280
3281 /*
3282 * Wait for the slots in the synchronized_standby_slots to catch up,
3283 * but use a timeout (1s) so we can also check if the
3284 * synchronized_standby_slots has been changed.
3285 */
3288 }
3289
3291}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1729
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1803
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
#define NameStr(name)
Definition c.h:835
#define ngettext(s, p, n)
Definition c.h:1270
#define Assert(condition)
Definition c.h:943
#define PG_BINARY
Definition c.h:1374
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
uint64_t uint64
Definition c.h:625
#define pg_unreachable()
Definition c.h:367
uint32_t uint32
Definition c.h:624
#define lengthof(array)
Definition c.h:873
#define StaticAssertDecl(condition, errmessage)
Definition c.h:1008
uint32 TransactionId
Definition c.h:736
size_t Size
Definition c.h:689
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1323
int errcode_for_file_access(void)
Definition elog.c:898
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
#define LOG
Definition elog.h:32
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:42
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define PANIC
Definition elog.h:44
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
int MakePGDirectory(const char *directoryName)
Definition fd.c:3963
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:123
ProcNumber MyProcNumber
Definition globals.c:92
bool IsUnderPostmaster
Definition globals.c:122
Oid MyDatabaseId
Definition globals.c:96
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6666
void * guc_malloc(int elevel, size_t size)
Definition guc.c:637
#define newval
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:511
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1588
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:431
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1885
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
@ B_STARTUP
Definition miscadmin.h:368
#define END_CRIT_SECTION()
Definition miscadmin.h:154
Oid GetUserId(void)
Definition miscinit.c:470
BackendType MyBackendType
Definition miscinit.c:65
bool has_rolreplication(Oid roleid)
Definition miscinit.c:689
void namestrcpy(Name name, const char *str)
Definition name.c:233
static char * errmsg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:177
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:182
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:263
#define snprintf
Definition port.h:261
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define GetPGProcByNumber(n)
Definition proc.h:504
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3942
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
Definition procarray.c:3446
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
#define ShmemRequestStruct(...)
Definition shmem.h:176
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:581
const ShmemCallbacks ReplicationSlotsShmemCallbacks
Definition slot.c:152
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1974
static void ReplicationSlotsShmemInit(void *arg)
Definition slot.c:221
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:115
char * synchronized_standby_slots
Definition slot.c:176
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3059
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:137
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2318
int idle_replication_slot_timeout_secs
Definition slot.c:170
void ReplicationSlotMarkDirty(void)
Definition slot.c:1178
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1882
void ReplicationSlotReserveWal(void)
Definition slot.c:1705
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1451
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:185
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:310
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1512
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:134
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1372
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2930
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1220
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2681
void ReplicationSlotPersist(void)
Definition slot.c:1195
bool CheckLogicalSlotExists(void)
Definition slot.c:1617
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1784
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2518
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:913
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3074
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2969
void ReplicationSlotSave(void)
Definition slot.c:1160
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2457
#define ReplicationSlotOnDiskV2Size
Definition slot.c:140
void CheckSlotPermissions(void)
Definition slot.c:1688
bool ReplicationSlotName(int index, Name name)
Definition slot.c:598
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:3003
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:944
void ReplicationSlotRelease(void)
Definition slot.c:769
int max_replication_slots
Definition slot.c:161
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:147
#define SLOT_VERSION
Definition slot.c:144
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3255
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3107
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1302
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:859
int max_repack_replication_slots
Definition slot.c:163
void ReplicationSlotInitialize(void)
Definition slot.c:240
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1053
void StartupReplicationSlots(void)
Definition slot.c:2396
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1866
void CheckSlotRequirements(bool repack)
Definition slot.c:1659
void ReplicationSlotDropAcquired(bool try_disable)
Definition slot.c:1029
#define SLOT_MAGIC
Definition slot.c:143
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2214
static void ReplicationSlotsShmemRequest(void *arg)
Definition slot.c:200
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:179
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:131
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2950
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:249
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:360
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:287
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1916
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:71
PROC_HDR * ProcGlobal
Definition proc.c:74
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1284
@ RECOVERY_CONFLICT_LOGICALSLOT
Definition standby.h:46
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotPersistentData slotdata
Definition slot.c:85
pg_crc32c checksum
Definition slot.c:74
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
ShmemRequestCallback request_fn
Definition shmem.h:133
const char * cause_name
Definition slot.c:112
ReplicationSlotInvalidationCause cause
Definition slot.c:111
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:103
ConditionVariable wal_confirm_rcv_cv
Definition type.h:97
Definition c.h:830
unsigned short st_mode
Definition win32_port.h:258
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2870
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
const char * name
bool am_walsender
Definition walsender.c:135
bool log_replication_commands
Definition walsender.c:150
WalSndCtlData * WalSndCtl
Definition walsender.c:121
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6834
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3809
bool EnableHotStandby
Definition xlog.c:128
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6937
int wal_level
Definition xlog.c:138
int wal_segment_size
Definition xlog.c:150
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2687
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:10094
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)