PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "utils/builtins.h"
59#include "utils/guc_hooks.h"
61#include "utils/varlena.h"
62
63/*
64 * Replication slot on-disk data structure.
65 */
67{
68 /* first part of this struct needs to be version independent */
69
70 /* data not covered by checksum */
73
74 /* data covered by checksum */
77
78 /*
79 * The actual data in the slot that follows can differ based on the above
80 * 'version'.
81 */
82
85
86/*
87 * Struct for the configuration of synchronized_standby_slots.
88 *
89 * Note: this must be a flat representation that can be held in a single chunk
90 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 * synchronized_standby_slots GUC.
92 */
93typedef struct
94{
95 /* Number of slot names in the slot_names[] */
97
98 /*
99 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 */
101 char slot_names[FLEXIBLE_ARRAY_MEMBER];
103
104/*
105 * Lookup table for slot invalidation causes.
106 */
112
114 {RS_INVAL_NONE, "none"},
115 {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 {RS_INVAL_HORIZON, "rows_removed"},
117 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119};
120
121/*
122 * Ensure that the lookup table is up-to-date with the enums defined in
123 * ReplicationSlotInvalidationCause.
124 */
126 "array length mismatch");
127
128/* size of version independent data */
129#define ReplicationSlotOnDiskConstantSize \
130 offsetof(ReplicationSlotOnDisk, slotdata)
131/* size of the part of the slot not covered by the checksum */
132#define ReplicationSlotOnDiskNotChecksummedSize \
133 offsetof(ReplicationSlotOnDisk, version)
134/* size of the part covered by the checksum */
135#define ReplicationSlotOnDiskChecksummedSize \
136 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137/* size of the slot data that is version dependent */
138#define ReplicationSlotOnDiskV2Size \
139 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140
141#define SLOT_MAGIC 0x1051CA1 /* format identifier */
142#define SLOT_VERSION 5 /* version for new files */
143
144/* Control array for replication slot management */
146
147/* My backend's replication slot in the shared memory array */
149
150/* GUC variables */
151int max_replication_slots = 10; /* the maximum number of replication
152 * slots */
153
154/*
155 * Invalidate replication slots that have remained idle longer than this
156 * duration; '0' disables it.
157 */
159
160/*
161 * This GUC lists streaming replication standby server slot names that
162 * logical WAL sender processes will wait for.
163 */
165
166/* This is the parsed and cached configuration for synchronized_standby_slots */
168
169/*
170 * Oldest LSN that has been confirmed to be flushed to the standbys
171 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 */
174
175static void ReplicationSlotShmemExit(int code, Datum arg);
176static bool IsSlotForConflictCheck(const char *name);
177static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178
179/* internal persistency functions */
180static void RestoreSlotFromDisk(const char *name);
181static void CreateSlotOnDisk(ReplicationSlot *slot);
182static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183
184/*
185 * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 */
187Size
189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
201
202/*
203 * Allocate and initialize shared memory for replication slots.
204 */
205void
207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
230 SpinLockInit(&slot->mutex);
234 }
235 }
236}
237
238/*
239 * Register the callback for replication slot cleanup and releasing.
240 */
241void
246
247/*
248 * Release and cleanup replication slots.
249 */
250static void
252{
253 /* Make sure active replication slots are released */
254 if (MyReplicationSlot != NULL)
256
257 /* Also cleanup all the temporary slots. */
259}
260
261/*
262 * Check whether the passed slot name is valid and report errors at elevel.
263 *
264 * See comments for ReplicationSlotValidateNameInternal().
265 */
266bool
268 int elevel)
269{
270 int err_code;
271 char *err_msg = NULL;
272 char *err_hint = NULL;
273
275 &err_code, &err_msg, &err_hint))
276 {
277 /*
278 * Use errmsg_internal() and errhint_internal() instead of errmsg()
279 * and errhint(), since the messages from
280 * ReplicationSlotValidateNameInternal() are already translated. This
281 * avoids double translation.
282 */
283 ereport(elevel,
285 errmsg_internal("%s", err_msg),
286 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
287
288 pfree(err_msg);
289 if (err_hint != NULL)
291 return false;
292 }
293
294 return true;
295}
296
297/*
298 * Check whether the passed slot name is valid.
299 *
300 * An error will be reported for a reserved replication slot name if
301 * allow_reserved_name is set to false.
302 *
303 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
304 * the name to be used as a directory name on every supported OS.
305 *
306 * Returns true if the slot name is valid. Otherwise, returns false and stores
307 * the error code, error message, and optional hint in err_code, err_msg, and
308 * err_hint, respectively. The caller is responsible for freeing err_msg and
309 * err_hint, which are palloc'd.
310 */
311bool
313 int *err_code, char **err_msg, char **err_hint)
314{
315 const char *cp;
316
317 if (strlen(name) == 0)
318 {
320 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
321 *err_hint = NULL;
322 return false;
323 }
324
325 if (strlen(name) >= NAMEDATALEN)
326 {
328 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
329 *err_hint = NULL;
330 return false;
331 }
332
333 for (cp = name; *cp; cp++)
334 {
335 if (!((*cp >= 'a' && *cp <= 'z')
336 || (*cp >= '0' && *cp <= '9')
337 || (*cp == '_')))
338 {
340 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
341 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
342 return false;
343 }
344 }
345
347 {
349 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
350 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
352 return false;
353 }
354
355 return true;
356}
357
358/*
359 * Return true if the replication slot name is "pg_conflict_detection".
360 */
361static bool
363{
364 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
365}
366
367/*
368 * Create a new replication slot and mark it as used by this backend.
369 *
370 * name: Name of the slot
371 * db_specific: logical decoding is db specific; if the slot is going to
372 * be used for that pass true, otherwise false.
373 * two_phase: If enabled, allows decoding of prepared transactions.
374 * failover: If enabled, allows the slot to be synced to standbys so
375 * that logical replication can be resumed after failover.
376 * synced: True if the slot is synchronized from the primary server.
377 */
378void
380 ReplicationSlotPersistency persistency,
381 bool two_phase, bool failover, bool synced)
382{
383 ReplicationSlot *slot = NULL;
384 int i;
385
387
388 /*
389 * The logical launcher or pg_upgrade may create or migrate an internal
390 * slot, so using a reserved name is allowed in these cases.
391 */
393 ERROR);
394
395 if (failover)
396 {
397 /*
398 * Do not allow users to create the failover enabled slots on the
399 * standby as we do not support sync to the cascading standby.
400 *
401 * However, failover enabled slots can be created during slot
402 * synchronization because we need to retain the same values as the
403 * remote slot.
404 */
408 errmsg("cannot enable failover for a replication slot created on the standby"));
409
410 /*
411 * Do not allow users to create failover enabled temporary slots,
412 * because temporary slots will not be synced to the standby.
413 *
414 * However, failover enabled temporary slots can be created during
415 * slot synchronization. See the comments atop slotsync.c for details.
416 */
417 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
420 errmsg("cannot enable failover for a temporary replication slot"));
421 }
422
423 /*
424 * If some other backend ran this code concurrently with us, we'd likely
425 * both allocate the same slot, and that would be bad. We'd also be at
426 * risk of missing a name collision. Also, we don't want to try to create
427 * a new slot while somebody's busy cleaning up an old one, because we
428 * might both be monkeying with the same directory.
429 */
431
432 /*
433 * Check for name collision, and identify an allocatable slot. We need to
434 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
435 * else can change the in_use flags while we're looking at them.
436 */
438 for (i = 0; i < max_replication_slots; i++)
439 {
441
442 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
445 errmsg("replication slot \"%s\" already exists", name)));
446 if (!s->in_use && slot == NULL)
447 slot = s;
448 }
450
451 /* If all slots are in use, we're out of luck. */
452 if (slot == NULL)
455 errmsg("all replication slots are in use"),
456 errhint("Free one or increase \"max_replication_slots\".")));
457
458 /*
459 * Since this slot is not in use, nobody should be looking at any part of
460 * it other than the in_use field unless they're trying to allocate it.
461 * And since we hold ReplicationSlotAllocationLock, nobody except us can
462 * be doing that. So it's safe to initialize the slot.
463 */
464 Assert(!slot->in_use);
466
467 /* first initialize persistent data */
468 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
469 namestrcpy(&slot->data.name, name);
471 slot->data.persistency = persistency;
472 slot->data.two_phase = two_phase;
474 slot->data.failover = failover;
475 slot->data.synced = synced;
476
477 /* and then data only present in shared memory */
478 slot->just_dirtied = false;
479 slot->dirty = false;
488 slot->inactive_since = 0;
490
491 /*
492 * Create the slot on disk. We haven't actually marked the slot allocated
493 * yet, so no special cleanup is required if this errors out.
494 */
495 CreateSlotOnDisk(slot);
496
497 /*
498 * We need to briefly prevent any other backend from iterating over the
499 * slots while we flip the in_use flag. We also need to set the active
500 * flag while holding the ControlLock as otherwise a concurrent
501 * ReplicationSlotAcquire() could acquire the slot as well.
502 */
504
505 slot->in_use = true;
506
507 /* We can now mark the slot active, and that makes it our slot. */
508 SpinLockAcquire(&slot->mutex);
511 SpinLockRelease(&slot->mutex);
512 MyReplicationSlot = slot;
513
515
516 /*
517 * Create statistics entry for the new logical slot. We don't collect any
518 * stats for physical slots, so no need to create an entry for the same.
519 * See ReplicationSlotDropPtr for why we need to do this before releasing
520 * ReplicationSlotAllocationLock.
521 */
522 if (SlotIsLogical(slot))
524
525 /*
526 * Now that the slot has been marked as in_use and active, it's safe to
527 * let somebody else try to allocate a slot.
528 */
530
531 /* Let everybody know we've modified this slot */
533}
534
535/*
536 * Search for the named replication slot.
537 *
538 * Return the replication slot if found, otherwise NULL.
539 */
542{
543 int i;
544 ReplicationSlot *slot = NULL;
545
546 if (need_lock)
548
549 for (i = 0; i < max_replication_slots; i++)
550 {
552
553 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
554 {
555 slot = s;
556 break;
557 }
558 }
559
560 if (need_lock)
562
563 return slot;
564}
565
566/*
567 * Return the index of the replication slot in
568 * ReplicationSlotCtl->replication_slots.
569 *
570 * This is mainly useful to have an efficient key for storing replication slot
571 * stats.
572 */
573int
581
582/*
583 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
584 * the slot's name and true is returned.
585 *
586 * This likely is only useful for pgstat_replslot.c during shutdown, in other
587 * cases there are obvious TOCTOU issues.
588 */
589bool
591{
592 ReplicationSlot *slot;
593 bool found;
594
596
597 /*
598 * Ensure that the slot cannot be dropped while we copy the name. Don't
599 * need the spinlock as the name of an existing slot cannot change.
600 */
602 found = slot->in_use;
603 if (slot->in_use)
606
607 return found;
608}
609
610/*
611 * Find a previously created slot and mark it as used by this process.
612 *
613 * An error is raised if nowait is true and the slot is currently in use. If
614 * nowait is false, we sleep until the slot is released by the owning process.
615 *
616 * An error is raised if error_if_invalid is true and the slot is found to
617 * be invalid. It should always be set to true, except when we are temporarily
618 * acquiring the slot and don't intend to change it.
619 */
620void
621ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
622{
624 ProcNumber active_proc;
625 int active_pid;
626
627 Assert(name != NULL);
628
629retry:
631
633
634 /* Check if the slot exists with the given name. */
636 if (s == NULL || !s->in_use)
637 {
639
642 errmsg("replication slot \"%s\" does not exist",
643 name)));
644 }
645
646 /*
647 * Do not allow users to acquire the reserved slot. This scenario may
648 * occur if the launcher that owns the slot has terminated unexpectedly
649 * due to an error, and a backend process attempts to reuse the slot.
650 */
654 errmsg("cannot acquire replication slot \"%s\"", name),
655 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
656
657 /*
658 * This is the slot we want; check if it's active under some other
659 * process. In single user mode, we don't need this check.
660 */
662 {
663 /*
664 * Get ready to sleep on the slot in case it is active. (We may end
665 * up not sleeping, but we don't want to do this while holding the
666 * spinlock.)
667 */
668 if (!nowait)
670
671 /*
672 * It is important to reset the inactive_since under spinlock here to
673 * avoid race conditions with slot invalidation. See comments related
674 * to inactive_since in InvalidatePossiblyObsoleteSlot.
675 */
679 active_proc = s->active_proc;
682 }
683 else
684 {
685 s->active_proc = active_proc = MyProcNumber;
687 }
688 active_pid = GetPGProcByNumber(active_proc)->pid;
690
691 /*
692 * If we found the slot but it's already active in another process, we
693 * wait until the owning process signals us that it's been released, or
694 * error out.
695 */
696 if (active_proc != MyProcNumber)
697 {
698 if (!nowait)
699 {
700 /* Wait here until we get signaled, and then restart */
704 goto retry;
705 }
706
709 errmsg("replication slot \"%s\" is active for PID %d",
710 NameStr(s->data.name), active_pid)));
711 }
712 else if (!nowait)
713 ConditionVariableCancelSleep(); /* no sleep needed after all */
714
715 /* We made this slot active, so it's ours now. */
717
718 /*
719 * We need to check for invalidation after making the slot ours to avoid
720 * the possible race condition with the checkpointer that can otherwise
721 * invalidate the slot immediately after the check.
722 */
726 errmsg("can no longer access replication slot \"%s\"",
727 NameStr(s->data.name)),
728 errdetail("This replication slot has been invalidated due to \"%s\".",
730
731 /* Let everybody know we've modified this slot */
733
734 /*
735 * The call to pgstat_acquire_replslot() protects against stats for a
736 * different slot, from before a restart or such, being present during
737 * pgstat_report_replslot().
738 */
739 if (SlotIsLogical(s))
741
742
743 if (am_walsender)
744 {
747 ? errmsg("acquired logical replication slot \"%s\"",
748 NameStr(s->data.name))
749 : errmsg("acquired physical replication slot \"%s\"",
750 NameStr(s->data.name)));
751 }
752}
753
754/*
755 * Release the replication slot that this backend considers to own.
756 *
757 * This or another backend can re-acquire the slot later.
758 * Resources this slot requires will be preserved.
759 */
760void
762{
764 char *slotname = NULL; /* keep compiler quiet */
765 bool is_logical;
766 TimestampTz now = 0;
767
768 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
769
771
772 if (am_walsender)
773 slotname = pstrdup(NameStr(slot->data.name));
774
775 if (slot->data.persistency == RS_EPHEMERAL)
776 {
777 /*
778 * Delete the slot. There is no !PANIC case where this is allowed to
779 * fail, all that may happen is an incomplete cleanup of the on-disk
780 * data.
781 */
783
784 /*
785 * Request to disable logical decoding, even though this slot may not
786 * have been the last logical slot. The checkpointer will verify if
787 * logical decoding should actually be disabled.
788 */
789 if (is_logical)
791 }
792
793 /*
794 * If slot needed to temporarily restrain both data and catalog xmin to
795 * create the catalog snapshot, remove that temporary constraint.
796 * Snapshots can only be exported while the initial snapshot is still
797 * acquired.
798 */
799 if (!TransactionIdIsValid(slot->data.xmin) &&
801 {
802 SpinLockAcquire(&slot->mutex);
804 SpinLockRelease(&slot->mutex);
806 }
807
808 /*
809 * Set the time since the slot has become inactive. We get the current
810 * time beforehand to avoid system call while holding the spinlock.
811 */
813
814 if (slot->data.persistency == RS_PERSISTENT)
815 {
816 /*
817 * Mark persistent slot inactive. We're not freeing it, just
818 * disconnecting, but wake up others that may be waiting for it.
819 */
820 SpinLockAcquire(&slot->mutex);
823 SpinLockRelease(&slot->mutex);
825 }
826 else
828
830
831 /* might not have been set when we've been a plain slot */
836
837 if (am_walsender)
838 {
841 ? errmsg("released logical replication slot \"%s\"",
842 slotname)
843 : errmsg("released physical replication slot \"%s\"",
844 slotname));
845
846 pfree(slotname);
847 }
848}
849
850/*
851 * Cleanup temporary slots created in current session.
852 *
853 * Cleanup only synced temporary slots if 'synced_only' is true, else
854 * cleanup all temporary slots.
855 *
856 * If it drops the last logical slot in the cluster, requests to disable
857 * logical decoding.
858 */
859void
861{
862 int i;
864 bool dropped_logical = false;
865
867
868restart:
871 for (i = 0; i < max_replication_slots; i++)
872 {
874
875 if (!s->in_use)
876 continue;
877
879
882
883 if ((s->active_proc == MyProcNumber &&
884 (!synced_only || s->data.synced)))
885 {
888 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
889
890 if (SlotIsLogical(s))
891 dropped_logical = true;
892
894
896 goto restart;
897 }
898 else
900 }
901
903
906}
907
908/*
909 * Permanently drop replication slot identified by the passed in name.
910 */
911void
912ReplicationSlotDrop(const char *name, bool nowait)
913{
914 bool is_logical;
915
917
918 ReplicationSlotAcquire(name, nowait, false);
919
920 /*
921 * Do not allow users to drop the slots which are currently being synced
922 * from the primary to the standby.
923 */
927 errmsg("cannot drop replication slot \"%s\"", name),
928 errdetail("This replication slot is being synchronized from the primary server."));
929
931
933
934 if (is_logical)
936}
937
938/*
939 * Change the definition of the slot identified by the specified name.
940 *
941 * Altering the two_phase property of a slot requires caution on the
942 * client-side. Enabling it at any random point during decoding has the
943 * risk that transactions prepared before this change may be skipped by
944 * the decoder, leading to missing prepare records on the client. So, we
945 * enable it for subscription related slots only once the initial tablesync
946 * is finished. See comments atop worker.c. Disabling it is safe only when
947 * there are no pending prepared transaction, otherwise, the changes of
948 * already prepared transactions can be replicated again along with their
949 * corresponding commit leading to duplicate data or errors.
950 */
951void
952ReplicationSlotAlter(const char *name, const bool *failover,
953 const bool *two_phase)
954{
955 bool update_slot = false;
956
959
960 ReplicationSlotAcquire(name, false, true);
961
965 errmsg("cannot use %s with a physical replication slot",
966 "ALTER_REPLICATION_SLOT"));
967
968 if (RecoveryInProgress())
969 {
970 /*
971 * Do not allow users to alter the slots which are currently being
972 * synced from the primary to the standby.
973 */
977 errmsg("cannot alter replication slot \"%s\"", name),
978 errdetail("This replication slot is being synchronized from the primary server."));
979
980 /*
981 * Do not allow users to enable failover on the standby as we do not
982 * support sync to the cascading standby.
983 */
984 if (failover && *failover)
987 errmsg("cannot enable failover for a replication slot"
988 " on the standby"));
989 }
990
991 if (failover)
992 {
993 /*
994 * Do not allow users to enable failover for temporary slots as we do
995 * not support syncing temporary slots to the standby.
996 */
1000 errmsg("cannot enable failover for a temporary replication slot"));
1001
1003 {
1007
1008 update_slot = true;
1009 }
1010 }
1011
1013 {
1017
1018 update_slot = true;
1019 }
1020
1021 if (update_slot)
1022 {
1025 }
1026
1028}
1029
1030/*
1031 * Permanently drop the currently acquired replication slot.
1032 */
1033void
1035{
1037
1039
1040 /* slot isn't acquired anymore */
1042
1044}
1045
1046/*
1047 * Permanently drop the replication slot which will be released by the point
1048 * this function returns.
1049 */
1050static void
1052{
1053 char path[MAXPGPATH];
1054 char tmppath[MAXPGPATH];
1055
1056 /*
1057 * If some other backend ran this code concurrently with us, we might try
1058 * to delete a slot with a certain name while someone else was trying to
1059 * create a slot with the same name.
1060 */
1062
1063 /* Generate pathnames. */
1064 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1065 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1066
1067 /*
1068 * Rename the slot directory on disk, so that we'll no longer recognize
1069 * this as a valid slot. Note that if this fails, we've got to mark the
1070 * slot inactive before bailing out. If we're dropping an ephemeral or a
1071 * temporary slot, we better never fail hard as the caller won't expect
1072 * the slot to survive and this might get called during error handling.
1073 */
1074 if (rename(path, tmppath) == 0)
1075 {
1076 /*
1077 * We need to fsync() the directory we just renamed and its parent to
1078 * make sure that our changes are on disk in a crash-safe fashion. If
1079 * fsync() fails, we can't be sure whether the changes are on disk or
1080 * not. For now, we handle that by panicking;
1081 * StartupReplicationSlots() will try to straighten it out after
1082 * restart.
1083 */
1085 fsync_fname(tmppath, true);
1088 }
1089 else
1090 {
1091 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1092
1093 SpinLockAcquire(&slot->mutex);
1095 SpinLockRelease(&slot->mutex);
1096
1097 /* wake up anyone waiting on this slot */
1099
1102 errmsg("could not rename file \"%s\" to \"%s\": %m",
1103 path, tmppath)));
1104 }
1105
1106 /*
1107 * The slot is definitely gone. Lock out concurrent scans of the array
1108 * long enough to kill it. It's OK to clear the active PID here without
1109 * grabbing the mutex because nobody else can be scanning the array here,
1110 * and nobody can be attached to this slot and thus access it without
1111 * scanning the array.
1112 *
1113 * Also wake up processes waiting for it.
1114 */
1117 slot->in_use = false;
1120
1121 /*
1122 * Slot is dead and doesn't prevent resource removal anymore, recompute
1123 * limits.
1124 */
1127
1128 /*
1129 * If removing the directory fails, the worst thing that will happen is
1130 * that the user won't be able to create a new slot with the same name
1131 * until the next server restart. We warn about it, but that's all.
1132 */
1133 if (!rmtree(tmppath, true))
1135 (errmsg("could not remove directory \"%s\"", tmppath)));
1136
1137 /*
1138 * Drop the statistics entry for the replication slot. Do this while
1139 * holding ReplicationSlotAllocationLock so that we don't drop a
1140 * statistics entry for another slot with the same name just created in
1141 * another session.
1142 */
1143 if (SlotIsLogical(slot))
1145
1146 /*
1147 * We release this at the very end, so that nobody starts trying to create
1148 * a slot while we're still cleaning up the detritus of the old one.
1149 */
1151}
1152
1153/*
1154 * Serialize the currently acquired slot's state from memory to disk, thereby
1155 * guaranteeing the current state will survive a crash.
1156 */
1157void
1159{
1160 char path[MAXPGPATH];
1161
1163
1166}
1167
1168/*
1169 * Signal that it would be useful if the currently acquired slot would be
1170 * flushed out to disk.
1171 *
1172 * Note that the actual flush to disk can be delayed for a long time, if
1173 * required for correctness explicitly do a ReplicationSlotSave().
1174 */
1175void
1177{
1179
1181
1182 SpinLockAcquire(&slot->mutex);
1184 MyReplicationSlot->dirty = true;
1185 SpinLockRelease(&slot->mutex);
1186}
1187
1188/*
1189 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1190 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1191 */
1192void
1194{
1196
1197 Assert(slot != NULL);
1199
1200 SpinLockAcquire(&slot->mutex);
1202 SpinLockRelease(&slot->mutex);
1203
1206}
1207
1208/*
1209 * Compute the oldest xmin across all slots and store it in the ProcArray.
1210 *
1211 * If already_locked is true, both the ReplicationSlotControlLock and the
1212 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1213 * caller first acquires the ReplicationSlotControlLock, followed by the
1214 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1215 * acquires them in that order.
1216 */
1217void
1219{
1220 int i;
1223
1228
1229 /*
1230 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1231 * values, so no backend updates the initial xmin for newly created slot
1232 * concurrently. A shared lock is used here to minimize lock contention,
1233 * especially when many slots exist and advancements occur frequently.
1234 * This is safe since an exclusive lock is taken during initial slot xmin
1235 * update in slot creation.
1236 *
1237 * One might think that we can hold the ProcArrayLock exclusively and
1238 * update the slot xmin values, but it could increase lock contention on
1239 * the ProcArrayLock, which is not great since this function can be called
1240 * at non-negligible frequency.
1241 *
1242 * Concurrent invocation of this function may cause the computed slot xmin
1243 * to regress. However, this is harmless because tuples prior to the most
1244 * recent xmin are no longer useful once advancement occurs (see
1245 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1246 * before updating the effective_xmin). Thus, such regression merely
1247 * prevents VACUUM from prematurely removing tuples without causing the
1248 * early deletion of required data.
1249 */
1250 if (!already_locked)
1252
1253 for (i = 0; i < max_replication_slots; i++)
1254 {
1256 TransactionId effective_xmin;
1257 TransactionId effective_catalog_xmin;
1258 bool invalidated;
1259
1260 if (!s->in_use)
1261 continue;
1262
1264 effective_xmin = s->effective_xmin;
1265 effective_catalog_xmin = s->effective_catalog_xmin;
1266 invalidated = s->data.invalidated != RS_INVAL_NONE;
1268
1269 /* invalidated slots need not apply */
1270 if (invalidated)
1271 continue;
1272
1273 /* check the data xmin */
1274 if (TransactionIdIsValid(effective_xmin) &&
1276 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1277 agg_xmin = effective_xmin;
1278
1279 /* check the catalog xmin */
1280 if (TransactionIdIsValid(effective_catalog_xmin) &&
1282 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1283 agg_catalog_xmin = effective_catalog_xmin;
1284 }
1285
1287
1288 if (!already_locked)
1290}
1291
1292/*
1293 * Compute the oldest restart LSN across all slots and inform xlog module.
1294 *
1295 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1296 * purpose, we don't try to account for that, because this module doesn't
1297 * know what to compare against.
1298 */
1299void
1301{
1302 int i;
1304
1306
1308 for (i = 0; i < max_replication_slots; i++)
1309 {
1311 XLogRecPtr restart_lsn;
1312 XLogRecPtr last_saved_restart_lsn;
1313 bool invalidated;
1314 ReplicationSlotPersistency persistency;
1315
1316 if (!s->in_use)
1317 continue;
1318
1320 persistency = s->data.persistency;
1321 restart_lsn = s->data.restart_lsn;
1322 invalidated = s->data.invalidated != RS_INVAL_NONE;
1323 last_saved_restart_lsn = s->last_saved_restart_lsn;
1325
1326 /* invalidated slots need not apply */
1327 if (invalidated)
1328 continue;
1329
1330 /*
1331 * For persistent slot use last_saved_restart_lsn to compute the
1332 * oldest LSN for removal of WAL segments. The segments between
1333 * last_saved_restart_lsn and restart_lsn might be needed by a
1334 * persistent slot in the case of database crash. Non-persistent
1335 * slots can't survive the database crash, so we don't care about
1336 * last_saved_restart_lsn for them.
1337 */
1338 if (persistency == RS_PERSISTENT)
1339 {
1340 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1341 restart_lsn > last_saved_restart_lsn)
1342 {
1343 restart_lsn = last_saved_restart_lsn;
1344 }
1345 }
1346
1347 if (XLogRecPtrIsValid(restart_lsn) &&
1349 restart_lsn < min_required))
1350 min_required = restart_lsn;
1351 }
1353
1355}
1356
1357/*
1358 * Compute the oldest WAL LSN required by *logical* decoding slots..
1359 *
1360 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1361 * slots exist.
1362 *
1363 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1364 * ignores physical replication slots.
1365 *
1366 * The results aren't required frequently, so we don't maintain a precomputed
1367 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1368 */
1371{
1373 int i;
1374
1375 if (max_replication_slots <= 0)
1376 return InvalidXLogRecPtr;
1377
1379
1380 for (i = 0; i < max_replication_slots; i++)
1381 {
1382 ReplicationSlot *s;
1383 XLogRecPtr restart_lsn;
1384 XLogRecPtr last_saved_restart_lsn;
1385 bool invalidated;
1386 ReplicationSlotPersistency persistency;
1387
1389
1390 /* cannot change while ReplicationSlotCtlLock is held */
1391 if (!s->in_use)
1392 continue;
1393
1394 /* we're only interested in logical slots */
1395 if (!SlotIsLogical(s))
1396 continue;
1397
1398 /* read once, it's ok if it increases while we're checking */
1400 persistency = s->data.persistency;
1401 restart_lsn = s->data.restart_lsn;
1402 invalidated = s->data.invalidated != RS_INVAL_NONE;
1403 last_saved_restart_lsn = s->last_saved_restart_lsn;
1405
1406 /* invalidated slots need not apply */
1407 if (invalidated)
1408 continue;
1409
1410 /*
1411 * For persistent slot use last_saved_restart_lsn to compute the
1412 * oldest LSN for removal of WAL segments. The segments between
1413 * last_saved_restart_lsn and restart_lsn might be needed by a
1414 * persistent slot in the case of database crash. Non-persistent
1415 * slots can't survive the database crash, so we don't care about
1416 * last_saved_restart_lsn for them.
1417 */
1418 if (persistency == RS_PERSISTENT)
1419 {
1420 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1421 restart_lsn > last_saved_restart_lsn)
1422 {
1423 restart_lsn = last_saved_restart_lsn;
1424 }
1425 }
1426
1427 if (!XLogRecPtrIsValid(restart_lsn))
1428 continue;
1429
1430 if (!XLogRecPtrIsValid(result) ||
1431 restart_lsn < result)
1432 result = restart_lsn;
1433 }
1434
1436
1437 return result;
1438}
1439
1440/*
1441 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1442 * passed database oid.
1443 *
1444 * Returns true if there are any slots referencing the database. *nslots will
1445 * be set to the absolute number of slots in the database, *nactive to ones
1446 * currently active.
1447 */
1448bool
1450{
1451 int i;
1452
1453 *nslots = *nactive = 0;
1454
1455 if (max_replication_slots <= 0)
1456 return false;
1457
1459 for (i = 0; i < max_replication_slots; i++)
1460 {
1461 ReplicationSlot *s;
1462
1464
1465 /* cannot change while ReplicationSlotCtlLock is held */
1466 if (!s->in_use)
1467 continue;
1468
1469 /* only logical slots are database specific, skip */
1470 if (!SlotIsLogical(s))
1471 continue;
1472
1473 /* not our database, skip */
1474 if (s->data.database != dboid)
1475 continue;
1476
1477 /* NB: intentionally counting invalidated slots */
1478
1479 /* count slots with spinlock held */
1481 (*nslots)++;
1483 (*nactive)++;
1485 }
1487
1488 if (*nslots > 0)
1489 return true;
1490 return false;
1491}
1492
1493/*
1494 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1495 * passed database oid. The caller should hold an exclusive lock on the
1496 * pg_database oid for the database to prevent creation of new slots on the db
1497 * or replay from existing slots.
1498 *
1499 * Another session that concurrently acquires an existing slot on the target DB
1500 * (most likely to drop it) may cause this function to ERROR. If that happens
1501 * it may have dropped some but not all slots.
1502 *
1503 * This routine isn't as efficient as it could be - but we don't drop
1504 * databases often, especially databases with lots of slots.
1505 *
1506 * If it drops the last logical slot in the cluster, it requests to disable
1507 * logical decoding.
1508 */
1509void
1511{
1512 int i;
1514 bool dropped = false;
1515
1516 if (max_replication_slots <= 0)
1517 return;
1518
1519restart:
1522 for (i = 0; i < max_replication_slots; i++)
1523 {
1524 ReplicationSlot *s;
1525 char *slotname;
1526 ProcNumber active_proc;
1527
1529
1530 /* cannot change while ReplicationSlotCtlLock is held */
1531 if (!s->in_use)
1532 continue;
1533
1534 /* only logical slots are database specific, skip */
1535 if (!SlotIsLogical(s))
1536 continue;
1537
1538 /*
1539 * Check logical slots on other databases too so we can disable
1540 * logical decoding only if no slots in the cluster.
1541 */
1545
1546 /* not our database, skip */
1547 if (s->data.database != dboid)
1548 continue;
1549
1550 /* NB: intentionally including invalidated slots to drop */
1551
1552 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1554 /* can't change while ReplicationSlotControlLock is held */
1555 slotname = NameStr(s->data.name);
1556 active_proc = s->active_proc;
1557 if (active_proc == INVALID_PROC_NUMBER)
1558 {
1561 }
1563
1564 /*
1565 * Even though we hold an exclusive lock on the database object a
1566 * logical slot for that DB can still be active, e.g. if it's
1567 * concurrently being dropped by a backend connected to another DB.
1568 *
1569 * That's fairly unlikely in practice, so we'll just bail out.
1570 *
1571 * The slot sync worker holds a shared lock on the database before
1572 * operating on synced logical slots to avoid conflict with the drop
1573 * happening here. The persistent synced slots are thus safe but there
1574 * is a possibility that the slot sync worker has created a temporary
1575 * slot (which stays active even on release) and we are trying to drop
1576 * that here. In practice, the chances of hitting this scenario are
1577 * less as during slot synchronization, the temporary slot is
1578 * immediately converted to persistent and thus is safe due to the
1579 * shared lock taken on the database. So, we'll just bail out in such
1580 * a case.
1581 *
1582 * XXX: We can consider shutting down the slot sync worker before
1583 * trying to drop synced temporary slots here.
1584 */
1585 if (active_proc != INVALID_PROC_NUMBER)
1586 ereport(ERROR,
1588 errmsg("replication slot \"%s\" is active for PID %d",
1589 slotname, GetPGProcByNumber(active_proc)->pid)));
1590
1591 /*
1592 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1593 * holding ReplicationSlotControlLock over filesystem operations,
1594 * release ReplicationSlotControlLock and use
1595 * ReplicationSlotDropAcquired.
1596 *
1597 * As that means the set of slots could change, restart scan from the
1598 * beginning each time we release the lock.
1599 */
1602 dropped = true;
1603 goto restart;
1604 }
1606
1607 if (dropped && !found_valid_logicalslot)
1609}
1610
1611/*
1612 * Returns true if there is at least one in-use valid logical replication slot.
1613 */
1614bool
1616{
1617 bool found = false;
1618
1619 if (max_replication_slots <= 0)
1620 return false;
1621
1623 for (int i = 0; i < max_replication_slots; i++)
1624 {
1625 ReplicationSlot *s;
1626 bool invalidated;
1627
1629
1630 /* cannot change while ReplicationSlotCtlLock is held */
1631 if (!s->in_use)
1632 continue;
1633
1634 if (SlotIsPhysical(s))
1635 continue;
1636
1638 invalidated = s->data.invalidated != RS_INVAL_NONE;
1640
1641 if (invalidated)
1642 continue;
1643
1644 found = true;
1645 break;
1646 }
1648
1649 return found;
1650}
1651
1652/*
1653 * Check whether the server's configuration supports using replication
1654 * slots.
1655 */
1656void
1658{
1659 /*
1660 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1661 * needs the same check.
1662 */
1663
1664 if (max_replication_slots == 0)
1665 ereport(ERROR,
1667 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1668
1670 ereport(ERROR,
1672 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1673}
1674
1675/*
1676 * Check whether the user has privilege to use replication slots.
1677 */
1678void
1680{
1682 ereport(ERROR,
1684 errmsg("permission denied to use replication slots"),
1685 errdetail("Only roles with the %s attribute may use replication slots.",
1686 "REPLICATION")));
1687}
1688
1689/*
1690 * Reserve WAL for the currently active slot.
1691 *
1692 * Compute and set restart_lsn in a manner that's appropriate for the type of
1693 * the slot and concurrency safe.
1694 */
1695void
1697{
1699 XLogSegNo segno;
1700 XLogRecPtr restart_lsn;
1701
1702 Assert(slot != NULL);
1705
1706 /*
1707 * The replication slot mechanism is used to prevent the removal of
1708 * required WAL.
1709 *
1710 * Acquire an exclusive lock to prevent the checkpoint process from
1711 * concurrently computing the minimum slot LSN (see
1712 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1713 * replication cannot be removed during a checkpoint.
1714 *
1715 * The mechanism is reliable because if WAL reservation occurs first, the
1716 * checkpoint must wait for the restart_lsn update before determining the
1717 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1718 * first, subsequent WAL reservations will select positions at or beyond
1719 * the redo pointer of that checkpoint.
1720 */
1722
1723 /*
1724 * For logical slots log a standby snapshot and start logical decoding at
1725 * exactly that position. That allows the slot to start up more quickly.
1726 * But on a standby we cannot do WAL writes, so just use the replay
1727 * pointer; effectively, an attempt to create a logical slot on standby
1728 * will cause it to wait for an xl_running_xact record to be logged
1729 * independently on the primary, so that a snapshot can be built using the
1730 * record.
1731 *
1732 * None of this is needed (or indeed helpful) for physical slots as
1733 * they'll start replay at the last logged checkpoint anyway. Instead,
1734 * return the location of the last redo LSN, where a base backup has to
1735 * start replay at.
1736 */
1737 if (SlotIsPhysical(slot))
1738 restart_lsn = GetRedoRecPtr();
1739 else if (RecoveryInProgress())
1740 restart_lsn = GetXLogReplayRecPtr(NULL);
1741 else
1742 restart_lsn = GetXLogInsertRecPtr();
1743
1744 SpinLockAcquire(&slot->mutex);
1745 slot->data.restart_lsn = restart_lsn;
1746 SpinLockRelease(&slot->mutex);
1747
1748 /* prevent WAL removal as fast as possible */
1750
1751 /* Checkpoint shouldn't remove the required WAL. */
1753 if (XLogGetLastRemovedSegno() >= segno)
1754 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1755 NameStr(slot->data.name));
1756
1758
1759 if (!RecoveryInProgress() && SlotIsLogical(slot))
1760 {
1762
1763 /* make sure we have enough information to start */
1765
1766 /* and make sure it's fsynced to disk */
1768 }
1769}
1770
1771/*
1772 * Report that replication slot needs to be invalidated
1773 */
1774static void
1776 bool terminating,
1777 int pid,
1778 NameData slotname,
1779 XLogRecPtr restart_lsn,
1781 TransactionId snapshotConflictHorizon,
1782 long slot_idle_seconds)
1783{
1786
1789
1790 switch (cause)
1791 {
1793 {
1794 uint64 ex = oldestLSN - restart_lsn;
1795
1797 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1798 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1799 ex),
1800 LSN_FORMAT_ARGS(restart_lsn),
1801 ex);
1802 /* translator: %s is a GUC variable name */
1803 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1804 "max_slot_wal_keep_size");
1805 break;
1806 }
1807 case RS_INVAL_HORIZON:
1808 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1809 snapshotConflictHorizon);
1810 break;
1811
1812 case RS_INVAL_WAL_LEVEL:
1813 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1814 break;
1815
1817 {
1818 /* translator: %s is a GUC variable name */
1819 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1820 slot_idle_seconds, "idle_replication_slot_timeout",
1822 /* translator: %s is a GUC variable name */
1823 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1824 "idle_replication_slot_timeout");
1825 break;
1826 }
1827 case RS_INVAL_NONE:
1829 }
1830
1831 ereport(LOG,
1832 terminating ?
1833 errmsg("terminating process %d to release replication slot \"%s\"",
1834 pid, NameStr(slotname)) :
1835 errmsg("invalidating obsolete replication slot \"%s\"",
1836 NameStr(slotname)),
1837 errdetail_internal("%s", err_detail.data),
1838 err_hint.len ? errhint("%s", err_hint.data) : 0);
1839
1840 pfree(err_detail.data);
1841 pfree(err_hint.data);
1842}
1843
1844/*
1845 * Can we invalidate an idle replication slot?
1846 *
1847 * Idle timeout invalidation is allowed only when:
1848 *
1849 * 1. Idle timeout is set
1850 * 2. Slot has reserved WAL
1851 * 3. Slot is inactive
1852 * 4. The slot is not being synced from the primary while the server is in
1853 * recovery. This is because synced slots are always considered to be
1854 * inactive because they don't perform logical decoding to produce changes.
1855 */
1856static inline bool
1864
1865/*
1866 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1867 * becomes invalid among the given possible causes.
1868 *
1869 * This function sequentially checks all possible invalidation causes and
1870 * returns the first one for which the slot is eligible for invalidation.
1871 */
1874 XLogRecPtr oldestLSN, Oid dboid,
1875 TransactionId snapshotConflictHorizon,
1876 TimestampTz *inactive_since, TimestampTz now)
1877{
1879
1881 {
1882 XLogRecPtr restart_lsn = s->data.restart_lsn;
1883
1884 if (XLogRecPtrIsValid(restart_lsn) &&
1885 restart_lsn < oldestLSN)
1886 return RS_INVAL_WAL_REMOVED;
1887 }
1888
1890 {
1891 /* invalid DB oid signals a shared relation */
1892 if (SlotIsLogical(s) &&
1893 (dboid == InvalidOid || dboid == s->data.database))
1894 {
1895 TransactionId effective_xmin = s->effective_xmin;
1897
1898 if (TransactionIdIsValid(effective_xmin) &&
1899 TransactionIdPrecedesOrEquals(effective_xmin,
1900 snapshotConflictHorizon))
1901 return RS_INVAL_HORIZON;
1904 snapshotConflictHorizon))
1905 return RS_INVAL_HORIZON;
1906 }
1907 }
1908
1910 {
1911 if (SlotIsLogical(s))
1912 return RS_INVAL_WAL_LEVEL;
1913 }
1914
1916 {
1917 Assert(now > 0);
1918
1919 if (CanInvalidateIdleSlot(s))
1920 {
1921 /*
1922 * Simulate the invalidation due to idle_timeout to test the
1923 * timeout behavior promptly, without waiting for it to trigger
1924 * naturally.
1925 */
1926#ifdef USE_INJECTION_POINTS
1927 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1928 {
1929 *inactive_since = 0; /* since the beginning of time */
1930 return RS_INVAL_IDLE_TIMEOUT;
1931 }
1932#endif
1933
1934 /*
1935 * Check if the slot needs to be invalidated due to
1936 * idle_replication_slot_timeout GUC.
1937 */
1940 {
1941 *inactive_since = s->inactive_since;
1942 return RS_INVAL_IDLE_TIMEOUT;
1943 }
1944 }
1945 }
1946
1947 return RS_INVAL_NONE;
1948}
1949
1950/*
1951 * Helper for InvalidateObsoleteReplicationSlots
1952 *
1953 * Acquires the given slot and mark it invalid, if necessary and possible.
1954 *
1955 * Returns true if the slot was invalidated.
1956 *
1957 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1958 * interim (and in that case we're not holding the lock at return, otherwise
1959 * we are).
1960 *
1961 * This is inherently racy, because we release the LWLock
1962 * for syscalls, so caller must restart if we return true.
1963 */
1964static bool
1966 ReplicationSlot *s,
1968 Oid dboid, TransactionId snapshotConflictHorizon,
1969 bool *released_lock_out)
1970{
1971 int last_signaled_pid = 0;
1972 bool released_lock = false;
1973 bool invalidated = false;
1974 TimestampTz inactive_since = 0;
1975
1976 for (;;)
1977 {
1978 XLogRecPtr restart_lsn;
1979 NameData slotname;
1980 ProcNumber active_proc;
1981 int active_pid = 0;
1983 TimestampTz now = 0;
1984 long slot_idle_secs = 0;
1985
1987
1988 if (!s->in_use)
1989 {
1990 if (released_lock)
1992 break;
1993 }
1994
1996 {
1997 /*
1998 * Assign the current time here to avoid system call overhead
1999 * while holding the spinlock in subsequent code.
2000 */
2002 }
2003
2004 /*
2005 * Check if the slot needs to be invalidated. If it needs to be
2006 * invalidated, and is not currently acquired, acquire it and mark it
2007 * as having been invalidated. We do this with the spinlock held to
2008 * avoid race conditions -- for example the restart_lsn could move
2009 * forward, or the slot could be dropped.
2010 */
2012
2013 restart_lsn = s->data.restart_lsn;
2014
2015 /* we do nothing if the slot is already invalid */
2016 if (s->data.invalidated == RS_INVAL_NONE)
2018 s, oldestLSN,
2019 dboid,
2020 snapshotConflictHorizon,
2021 &inactive_since,
2022 now);
2023
2024 /* if there's no invalidation, we're done */
2026 {
2028 if (released_lock)
2030 break;
2031 }
2032
2033 slotname = s->data.name;
2034 active_proc = s->active_proc;
2035
2036 /*
2037 * If the slot can be acquired, do so and mark it invalidated
2038 * immediately. Otherwise we'll signal the owning process, below, and
2039 * retry.
2040 *
2041 * Note: Unlike other slot attributes, slot's inactive_since can't be
2042 * changed until the acquired slot is released or the owning process
2043 * is terminated. So, the inactive slot can only be invalidated
2044 * immediately without being terminated.
2045 */
2046 if (active_proc == INVALID_PROC_NUMBER)
2047 {
2051
2052 /*
2053 * XXX: We should consider not overwriting restart_lsn and instead
2054 * just rely on .invalidated.
2055 */
2057 {
2060 }
2061
2062 /* Let caller know */
2063 invalidated = true;
2064 }
2065 else
2066 {
2067 active_pid = GetPGProcByNumber(active_proc)->pid;
2068 Assert(active_pid != 0);
2069 }
2070
2072
2073 /*
2074 * Calculate the idle time duration of the slot if slot is marked
2075 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2076 */
2078 {
2079 int slot_idle_usecs;
2080
2081 TimestampDifference(inactive_since, now, &slot_idle_secs,
2083 }
2084
2085 if (active_proc != INVALID_PROC_NUMBER)
2086 {
2087 /*
2088 * Prepare the sleep on the slot's condition variable before
2089 * releasing the lock, to close a possible race condition if the
2090 * slot is released before the sleep below.
2091 */
2093
2095 released_lock = true;
2096
2097 /*
2098 * Signal to terminate the process that owns the slot, if we
2099 * haven't already signalled it. (Avoidance of repeated
2100 * signalling is the only reason for there to be a loop in this
2101 * routine; otherwise we could rely on caller's restart loop.)
2102 *
2103 * There is the race condition that other process may own the slot
2104 * after its current owner process is terminated and before this
2105 * process owns it. To handle that, we signal only if the PID of
2106 * the owning process has changed from the previous time. (This
2107 * logic assumes that the same PID is not reused very quickly.)
2108 */
2110 {
2112 slotname, restart_lsn,
2113 oldestLSN, snapshotConflictHorizon,
2115
2116 if (MyBackendType == B_STARTUP)
2118 active_pid,
2120 else
2121 (void) kill(active_pid, SIGTERM);
2122
2124 }
2125
2126 /* Wait until the slot is released. */
2129
2130 /*
2131 * Re-acquire lock and start over; we expect to invalidate the
2132 * slot next time (unless another process acquires the slot in the
2133 * meantime).
2134 *
2135 * Note: It is possible for a slot to advance its restart_lsn or
2136 * xmin values sufficiently between when we release the mutex and
2137 * when we recheck, moving from a conflicting state to a non
2138 * conflicting state. This is intentional and safe: if the slot
2139 * has caught up while we're busy here, the resources we were
2140 * concerned about (WAL segments or tuples) have not yet been
2141 * removed, and there's no reason to invalidate the slot.
2142 */
2144 continue;
2145 }
2146 else
2147 {
2148 /*
2149 * We hold the slot now and have already invalidated it; flush it
2150 * to ensure that state persists.
2151 *
2152 * Don't want to hold ReplicationSlotControlLock across file
2153 * system operations, so release it now but be sure to tell caller
2154 * to restart from scratch.
2155 */
2157 released_lock = true;
2158
2159 /* Make sure the invalidated state persists across server restart */
2163
2165 slotname, restart_lsn,
2166 oldestLSN, snapshotConflictHorizon,
2168
2169 /* done with this slot for now */
2170 break;
2171 }
2172 }
2173
2175
2177 return invalidated;
2178}
2179
2180/*
2181 * Invalidate slots that require resources about to be removed.
2182 *
2183 * Returns true when any slot have got invalidated.
2184 *
2185 * Whether a slot needs to be invalidated depends on the invalidation cause.
2186 * A slot is invalidated if it:
2187 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2188 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2189 * db; dboid may be InvalidOid for shared relations
2190 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2191 * logical.
2192 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2193 * "idle_replication_slot_timeout" duration.
2194 *
2195 * Note: This function attempts to invalidate the slot for multiple possible
2196 * causes in a single pass, minimizing redundant iterations. The "cause"
2197 * parameter can be a MASK representing one or more of the defined causes.
2198 *
2199 * If it invalidates the last logical slot in the cluster, it requests to
2200 * disable logical decoding.
2201 *
2202 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2203 */
2204bool
2206 XLogSegNo oldestSegno, Oid dboid,
2207 TransactionId snapshotConflictHorizon)
2208{
2210 bool invalidated = false;
2211 bool invalidated_logical = false;
2213
2214 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2217
2218 if (max_replication_slots == 0)
2219 return invalidated;
2220
2222
2223restart:
2226 for (int i = 0; i < max_replication_slots; i++)
2227 {
2229 bool released_lock = false;
2230
2231 if (!s->in_use)
2232 continue;
2233
2234 /* Prevent invalidation of logical slots during binary upgrade */
2236 {
2240
2241 continue;
2242 }
2243
2245 dboid, snapshotConflictHorizon,
2246 &released_lock))
2247 {
2249
2250 /* Remember we have invalidated a physical or logical slot */
2251 invalidated = true;
2252
2253 /*
2254 * Additionally, remember we have invalidated a logical slot as we
2255 * can request disabling logical decoding later.
2256 */
2257 if (SlotIsLogical(s))
2258 invalidated_logical = true;
2259 }
2260 else
2261 {
2262 /*
2263 * We need to check if the slot is invalidated here since
2264 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2265 * is already invalidated.
2266 */
2271 }
2272
2273 /* if the lock was released, start from scratch */
2274 if (released_lock)
2275 goto restart;
2276 }
2278
2279 /*
2280 * If any slots have been invalidated, recalculate the resource limits.
2281 */
2282 if (invalidated)
2283 {
2286 }
2287
2288 /*
2289 * Request the checkpointer to disable logical decoding if no valid
2290 * logical slots remain. If called by the checkpointer during a
2291 * checkpoint, only the request is initiated; actual deactivation is
2292 * deferred until after the checkpoint completes.
2293 */
2296
2297 return invalidated;
2298}
2299
2300/*
2301 * Flush all replication slots to disk.
2302 *
2303 * It is convenient to flush dirty replication slots at the time of checkpoint.
2304 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2305 * for which the confirmed_flush LSN has been updated since the last time it
2306 * was saved and flush them.
2307 */
2308void
2310{
2311 int i;
2312 bool last_saved_restart_lsn_updated = false;
2313
2314 elog(DEBUG1, "performing replication slot checkpoint");
2315
2316 /*
2317 * Prevent any slot from being created/dropped while we're active. As we
2318 * explicitly do *not* want to block iterating over replication_slots or
2319 * acquiring a slot we cannot take the control lock - but that's OK,
2320 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2321 * enough to guarantee that nobody can change the in_use bits on us.
2322 *
2323 * Additionally, acquiring the Allocation lock is necessary to serialize
2324 * the slot flush process with concurrent slot WAL reservation. This
2325 * ensures that the WAL position being reserved is either flushed to disk
2326 * or is beyond or equal to the redo pointer of the current checkpoint
2327 * (See ReplicationSlotReserveWal for details).
2328 */
2330
2331 for (i = 0; i < max_replication_slots; i++)
2332 {
2334 char path[MAXPGPATH];
2335
2336 if (!s->in_use)
2337 continue;
2338
2339 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2340 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2341
2342 /*
2343 * Slot's data is not flushed each time the confirmed_flush LSN is
2344 * updated as that could lead to frequent writes. However, we decide
2345 * to force a flush of all logical slot's data at the time of shutdown
2346 * if the confirmed_flush LSN is changed since we last flushed it to
2347 * disk. This helps in avoiding an unnecessary retreat of the
2348 * confirmed_flush LSN after restart.
2349 */
2350 if (is_shutdown && SlotIsLogical(s))
2351 {
2353
2354 if (s->data.invalidated == RS_INVAL_NONE &&
2356 {
2357 s->just_dirtied = true;
2358 s->dirty = true;
2359 }
2361 }
2362
2363 /*
2364 * Track if we're going to update slot's last_saved_restart_lsn. We
2365 * need this to know if we need to recompute the required LSN.
2366 */
2369
2370 SaveSlotToPath(s, path, LOG);
2371 }
2373
2374 /*
2375 * Recompute the required LSN if SaveSlotToPath() updated
2376 * last_saved_restart_lsn for any slot.
2377 */
2380}
2381
2382/*
2383 * Load all replication slots from disk into memory at server startup. This
2384 * needs to be run before we start crash recovery.
2385 */
2386void
2388{
2390 struct dirent *replication_de;
2391
2392 elog(DEBUG1, "starting up replication slots");
2393
2394 /* restore all slots by iterating over all on-disk entries */
2397 {
2398 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2400
2401 if (strcmp(replication_de->d_name, ".") == 0 ||
2402 strcmp(replication_de->d_name, "..") == 0)
2403 continue;
2404
2405 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2407
2408 /* we're only creating directories here, skip if it's not our's */
2410 continue;
2411
2412 /* we crashed while a slot was being setup or deleted, clean up */
2413 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2414 {
2415 if (!rmtree(path, true))
2416 {
2418 (errmsg("could not remove directory \"%s\"",
2419 path)));
2420 continue;
2421 }
2423 continue;
2424 }
2425
2426 /* looks like a slot in a normal state, restore */
2428 }
2430
2431 /* currently no slots exist, we're done. */
2432 if (max_replication_slots <= 0)
2433 return;
2434
2435 /* Now that we have recovered all the data, compute replication xmin */
2438}
2439
2440/* ----
2441 * Manipulation of on-disk state of replication slots
2442 *
2443 * NB: none of the routines below should take any notice whether a slot is the
2444 * current one or not, that's all handled a layer above.
2445 * ----
2446 */
2447static void
2449{
2450 char tmppath[MAXPGPATH];
2451 char path[MAXPGPATH];
2452 struct stat st;
2453
2454 /*
2455 * No need to take out the io_in_progress_lock, nobody else can see this
2456 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2457 * takes out the lock, if we'd take the lock here, we'd deadlock.
2458 */
2459
2460 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2461 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2462
2463 /*
2464 * It's just barely possible that some previous effort to create or drop a
2465 * slot with this name left a temp directory lying around. If that seems
2466 * to be the case, try to remove it. If the rmtree() fails, we'll error
2467 * out at the MakePGDirectory() below, so we don't bother checking
2468 * success.
2469 */
2470 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2471 rmtree(tmppath, true);
2472
2473 /* Create and fsync the temporary slot directory. */
2474 if (MakePGDirectory(tmppath) < 0)
2475 ereport(ERROR,
2477 errmsg("could not create directory \"%s\": %m",
2478 tmppath)));
2479 fsync_fname(tmppath, true);
2480
2481 /* Write the actual state file. */
2482 slot->dirty = true; /* signal that we really need to write */
2484
2485 /* Rename the directory into place. */
2486 if (rename(tmppath, path) != 0)
2487 ereport(ERROR,
2489 errmsg("could not rename file \"%s\" to \"%s\": %m",
2490 tmppath, path)));
2491
2492 /*
2493 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2494 * would persist after an OS crash or not - so, force a restart. The
2495 * restart would try to fsync this again till it works.
2496 */
2498
2499 fsync_fname(path, true);
2501
2503}
2504
2505/*
2506 * Shared functionality between saving and creating a replication slot.
2507 */
2508static void
2509SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2510{
2511 char tmppath[MAXPGPATH];
2512 char path[MAXPGPATH];
2513 int fd;
2515 bool was_dirty;
2516
2517 /* first check whether there's something to write out */
2518 SpinLockAcquire(&slot->mutex);
2519 was_dirty = slot->dirty;
2520 slot->just_dirtied = false;
2521 SpinLockRelease(&slot->mutex);
2522
2523 /* and don't do anything if there's nothing to write */
2524 if (!was_dirty)
2525 return;
2526
2528
2529 /* silence valgrind :( */
2530 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2531
2532 sprintf(tmppath, "%s/state.tmp", dir);
2533 sprintf(path, "%s/state", dir);
2534
2536 if (fd < 0)
2537 {
2538 /*
2539 * If not an ERROR, then release the lock before returning. In case
2540 * of an ERROR, the error recovery path automatically releases the
2541 * lock, but no harm in explicitly releasing even in that case. Note
2542 * that LWLockRelease() could affect errno.
2543 */
2544 int save_errno = errno;
2545
2547 errno = save_errno;
2548 ereport(elevel,
2550 errmsg("could not create file \"%s\": %m",
2551 tmppath)));
2552 return;
2553 }
2554
2555 cp.magic = SLOT_MAGIC;
2556 INIT_CRC32C(cp.checksum);
2557 cp.version = SLOT_VERSION;
2559
2560 SpinLockAcquire(&slot->mutex);
2561
2562 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2563
2564 SpinLockRelease(&slot->mutex);
2565
2566 COMP_CRC32C(cp.checksum,
2569 FIN_CRC32C(cp.checksum);
2570
2571 errno = 0;
2573 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2574 {
2575 int save_errno = errno;
2576
2579 unlink(tmppath);
2581
2582 /* if write didn't set errno, assume problem is no disk space */
2584 ereport(elevel,
2586 errmsg("could not write to file \"%s\": %m",
2587 tmppath)));
2588 return;
2589 }
2591
2592 /* fsync the temporary file */
2594 if (pg_fsync(fd) != 0)
2595 {
2596 int save_errno = errno;
2597
2600 unlink(tmppath);
2602
2603 errno = save_errno;
2604 ereport(elevel,
2606 errmsg("could not fsync file \"%s\": %m",
2607 tmppath)));
2608 return;
2609 }
2611
2612 if (CloseTransientFile(fd) != 0)
2613 {
2614 int save_errno = errno;
2615
2616 unlink(tmppath);
2618
2619 errno = save_errno;
2620 ereport(elevel,
2622 errmsg("could not close file \"%s\": %m",
2623 tmppath)));
2624 return;
2625 }
2626
2627 /* rename to permanent file, fsync file and directory */
2628 if (rename(tmppath, path) != 0)
2629 {
2630 int save_errno = errno;
2631
2632 unlink(tmppath);
2634
2635 errno = save_errno;
2636 ereport(elevel,
2638 errmsg("could not rename file \"%s\" to \"%s\": %m",
2639 tmppath, path)));
2640 return;
2641 }
2642
2643 /*
2644 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2645 */
2647
2648 fsync_fname(path, false);
2649 fsync_fname(dir, true);
2651
2653
2654 /*
2655 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2656 * already and remember the confirmed_flush LSN value.
2657 */
2658 SpinLockAcquire(&slot->mutex);
2659 if (!slot->just_dirtied)
2660 slot->dirty = false;
2661 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2662 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2663 SpinLockRelease(&slot->mutex);
2664
2666}
2667
2668/*
2669 * Load a single slot from disk into memory.
2670 */
2671static void
2673{
2675 int i;
2676 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2677 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2678 int fd;
2679 bool restored = false;
2680 int readBytes;
2681 pg_crc32c checksum;
2682 TimestampTz now = 0;
2683
2684 /* no need to lock here, no concurrent access allowed yet */
2685
2686 /* delete temp file if it exists */
2687 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2688 sprintf(path, "%s/state.tmp", slotdir);
2689 if (unlink(path) < 0 && errno != ENOENT)
2690 ereport(PANIC,
2692 errmsg("could not remove file \"%s\": %m", path)));
2693
2694 sprintf(path, "%s/state", slotdir);
2695
2696 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2697
2698 /* on some operating systems fsyncing a file requires O_RDWR */
2700
2701 /*
2702 * We do not need to handle this as we are rename()ing the directory into
2703 * place only after we fsync()ed the state file.
2704 */
2705 if (fd < 0)
2706 ereport(PANIC,
2708 errmsg("could not open file \"%s\": %m", path)));
2709
2710 /*
2711 * Sync state file before we're reading from it. We might have crashed
2712 * while it wasn't synced yet and we shouldn't continue on that basis.
2713 */
2715 if (pg_fsync(fd) != 0)
2716 ereport(PANIC,
2718 errmsg("could not fsync file \"%s\": %m",
2719 path)));
2721
2722 /* Also sync the parent directory */
2724 fsync_fname(slotdir, true);
2726
2727 /* read part of statefile that's guaranteed to be version independent */
2732 {
2733 if (readBytes < 0)
2734 ereport(PANIC,
2736 errmsg("could not read file \"%s\": %m", path)));
2737 else
2738 ereport(PANIC,
2740 errmsg("could not read file \"%s\": read %d of %zu",
2741 path, readBytes,
2743 }
2744
2745 /* verify magic */
2746 if (cp.magic != SLOT_MAGIC)
2747 ereport(PANIC,
2749 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2750 path, cp.magic, SLOT_MAGIC)));
2751
2752 /* verify version */
2753 if (cp.version != SLOT_VERSION)
2754 ereport(PANIC,
2756 errmsg("replication slot file \"%s\" has unsupported version %u",
2757 path, cp.version)));
2758
2759 /* boundary check on length */
2760 if (cp.length != ReplicationSlotOnDiskV2Size)
2761 ereport(PANIC,
2763 errmsg("replication slot file \"%s\" has corrupted length %u",
2764 path, cp.length)));
2765
2766 /* Now that we know the size, read the entire file */
2768 readBytes = read(fd,
2770 cp.length);
2772 if (readBytes != cp.length)
2773 {
2774 if (readBytes < 0)
2775 ereport(PANIC,
2777 errmsg("could not read file \"%s\": %m", path)));
2778 else
2779 ereport(PANIC,
2781 errmsg("could not read file \"%s\": read %d of %zu",
2782 path, readBytes, (Size) cp.length)));
2783 }
2784
2785 if (CloseTransientFile(fd) != 0)
2786 ereport(PANIC,
2788 errmsg("could not close file \"%s\": %m", path)));
2789
2790 /* now verify the CRC */
2791 INIT_CRC32C(checksum);
2792 COMP_CRC32C(checksum,
2795 FIN_CRC32C(checksum);
2796
2797 if (!EQ_CRC32C(checksum, cp.checksum))
2798 ereport(PANIC,
2799 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2800 path, checksum, cp.checksum)));
2801
2802 /*
2803 * If we crashed with an ephemeral slot active, don't restore but delete
2804 * it.
2805 */
2806 if (cp.slotdata.persistency != RS_PERSISTENT)
2807 {
2808 if (!rmtree(slotdir, true))
2809 {
2811 (errmsg("could not remove directory \"%s\"",
2812 slotdir)));
2813 }
2815 return;
2816 }
2817
2818 /*
2819 * Verify that requirements for the specific slot type are met. That's
2820 * important because if these aren't met we're not guaranteed to retain
2821 * all the necessary resources for the slot.
2822 *
2823 * NB: We have to do so *after* the above checks for ephemeral slots,
2824 * because otherwise a slot that shouldn't exist anymore could prevent
2825 * restarts.
2826 *
2827 * NB: Changing the requirements here also requires adapting
2828 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2829 */
2830 if (cp.slotdata.database != InvalidOid)
2831 {
2833 ereport(FATAL,
2835 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2836 NameStr(cp.slotdata.name)),
2837 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2838
2839 /*
2840 * In standby mode, the hot standby must be enabled. This check is
2841 * necessary to ensure logical slots are invalidated when they become
2842 * incompatible due to insufficient wal_level. Otherwise, if the
2843 * primary reduces effective_wal_level < logical while hot standby is
2844 * disabled, primary disable logical decoding while hot standby is
2845 * disabled, logical slots would remain valid even after promotion.
2846 */
2848 ereport(FATAL,
2850 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2851 NameStr(cp.slotdata.name)),
2852 errhint("Change \"hot_standby\" to be \"on\".")));
2853 }
2854 else if (wal_level < WAL_LEVEL_REPLICA)
2855 ereport(FATAL,
2857 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2858 NameStr(cp.slotdata.name)),
2859 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2860
2861 /* nothing can be active yet, don't lock anything */
2862 for (i = 0; i < max_replication_slots; i++)
2863 {
2864 ReplicationSlot *slot;
2865
2867
2868 if (slot->in_use)
2869 continue;
2870
2871 /* restore the entire set of persistent data */
2872 memcpy(&slot->data, &cp.slotdata,
2874
2875 /* initialize in memory state */
2876 slot->effective_xmin = cp.slotdata.xmin;
2877 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2878 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2879 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2880
2885
2886 slot->in_use = true;
2888
2889 /*
2890 * Set the time since the slot has become inactive after loading the
2891 * slot from the disk into memory. Whoever acquires the slot i.e.
2892 * makes the slot active will reset it. Use the same inactive_since
2893 * time for all the slots.
2894 */
2895 if (now == 0)
2897
2899
2900 restored = true;
2901 break;
2902 }
2903
2904 if (!restored)
2905 ereport(FATAL,
2906 (errmsg("too many replication slots active before shutdown"),
2907 errhint("Increase \"max_replication_slots\" and try again.")));
2908}
2909
2910/*
2911 * Maps an invalidation reason for a replication slot to
2912 * ReplicationSlotInvalidationCause.
2913 */
2915GetSlotInvalidationCause(const char *cause_name)
2916{
2917 Assert(cause_name);
2918
2919 /* Search lookup table for the cause having this name */
2920 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2921 {
2922 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2924 }
2925
2926 Assert(false);
2927 return RS_INVAL_NONE; /* to keep compiler quiet */
2928}
2929
2930/*
2931 * Maps a ReplicationSlotInvalidationCause to the invalidation
2932 * reason for a replication slot.
2933 */
2934const char *
2936{
2937 /* Search lookup table for the name of this cause */
2938 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2939 {
2940 if (SlotInvalidationCauses[i].cause == cause)
2942 }
2943
2944 Assert(false);
2945 return "none"; /* to keep compiler quiet */
2946}
2947
2948/*
2949 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2950 *
2951 * The rawname will be parsed, and the result will be saved into *elemlist.
2952 */
2953static bool
2955{
2956 /* Verify syntax and parse string into a list of identifiers */
2958 {
2959 GUC_check_errdetail("List syntax is invalid.");
2960 return false;
2961 }
2962
2963 /* Iterate the list to validate each slot name */
2964 foreach_ptr(char, name, *elemlist)
2965 {
2966 int err_code;
2967 char *err_msg = NULL;
2968 char *err_hint = NULL;
2969
2971 &err_msg, &err_hint))
2972 {
2974 GUC_check_errdetail("%s", err_msg);
2975 if (err_hint != NULL)
2977 return false;
2978 }
2979 }
2980
2981 return true;
2982}
2983
2984/*
2985 * GUC check_hook for synchronized_standby_slots
2986 */
2987bool
2989{
2990 char *rawname;
2991 char *ptr;
2992 List *elemlist;
2993 int size;
2994 bool ok;
2996
2997 if ((*newval)[0] == '\0')
2998 return true;
2999
3000 /* Need a modifiable copy of the GUC string */
3002
3003 /* Now verify if the specified slots exist and have correct type */
3005
3006 if (!ok || elemlist == NIL)
3007 {
3008 pfree(rawname);
3010 return ok;
3011 }
3012
3013 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3014 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3015 foreach_ptr(char, slot_name, elemlist)
3016 size += strlen(slot_name) + 1;
3017
3018 /* GUC extra value must be guc_malloc'd, not palloc'd */
3019 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3020 if (!config)
3021 return false;
3022
3023 /* Transform the data into SyncStandbySlotsConfigData */
3024 config->nslotnames = list_length(elemlist);
3025
3026 ptr = config->slot_names;
3027 foreach_ptr(char, slot_name, elemlist)
3028 {
3029 strcpy(ptr, slot_name);
3030 ptr += strlen(slot_name) + 1;
3031 }
3032
3033 *extra = config;
3034
3035 pfree(rawname);
3037 return true;
3038}
3039
3040/*
3041 * GUC assign_hook for synchronized_standby_slots
3042 */
3043void
3045{
3046 /*
3047 * The standby slots may have changed, so we must recompute the oldest
3048 * LSN.
3049 */
3051
3053}
3054
3055/*
3056 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3057 */
3058bool
3059SlotExistsInSyncStandbySlots(const char *slot_name)
3060{
3061 const char *standby_slot_name;
3062
3063 /* Return false if there is no value in synchronized_standby_slots */
3065 return false;
3066
3067 /*
3068 * XXX: We are not expecting this list to be long so a linear search
3069 * shouldn't hurt but if that turns out not to be true then we can cache
3070 * this information for each WalSender as well.
3071 */
3073 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3074 {
3075 if (strcmp(standby_slot_name, slot_name) == 0)
3076 return true;
3077
3079 }
3080
3081 return false;
3082}
3083
3084/*
3085 * Return true if the slots specified in synchronized_standby_slots have caught up to
3086 * the given WAL location, false otherwise.
3087 *
3088 * The elevel parameter specifies the error level used for logging messages
3089 * related to slots that do not exist, are invalidated, or are inactive.
3090 */
3091bool
3093{
3094 const char *name;
3095 int caught_up_slot_num = 0;
3097
3098 /*
3099 * Don't need to wait for the standbys to catch up if there is no value in
3100 * synchronized_standby_slots.
3101 */
3103 return true;
3104
3105 /*
3106 * Don't need to wait for the standbys to catch up if we are on a standby
3107 * server, since we do not support syncing slots to cascading standbys.
3108 */
3109 if (RecoveryInProgress())
3110 return true;
3111
3112 /*
3113 * Don't need to wait for the standbys to catch up if they are already
3114 * beyond the specified WAL location.
3115 */
3118 return true;
3119
3120 /*
3121 * To prevent concurrent slot dropping and creation while filtering the
3122 * slots, take the ReplicationSlotControlLock outside of the loop.
3123 */
3125
3127 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3128 {
3129 XLogRecPtr restart_lsn;
3130 bool invalidated;
3131 bool inactive;
3132 ReplicationSlot *slot;
3133
3134 slot = SearchNamedReplicationSlot(name, false);
3135
3136 /*
3137 * If a slot name provided in synchronized_standby_slots does not
3138 * exist, report a message and exit the loop.
3139 */
3140 if (!slot)
3141 {
3142 ereport(elevel,
3144 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3145 name, "synchronized_standby_slots"),
3146 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3147 name),
3148 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3149 name, "synchronized_standby_slots"));
3150 break;
3151 }
3152
3153 /* Same as above: if a slot is not physical, exit the loop. */
3154 if (SlotIsLogical(slot))
3155 {
3156 ereport(elevel,
3158 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3159 name, "synchronized_standby_slots"),
3160 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3161 name),
3162 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3163 name, "synchronized_standby_slots"));
3164 break;
3165 }
3166
3167 SpinLockAcquire(&slot->mutex);
3168 restart_lsn = slot->data.restart_lsn;
3169 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3171 SpinLockRelease(&slot->mutex);
3172
3173 if (invalidated)
3174 {
3175 /* Specified physical slot has been invalidated */
3176 ereport(elevel,
3178 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3179 name, "synchronized_standby_slots"),
3180 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3181 name),
3182 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3183 name, "synchronized_standby_slots"));
3184 break;
3185 }
3186
3187 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3188 {
3189 /* Log a message if no active_pid for this physical slot */
3190 if (inactive)
3191 ereport(elevel,
3193 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3194 name, "synchronized_standby_slots"),
3195 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3196 name),
3197 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3198 name, "synchronized_standby_slots"));
3199
3200 /* Continue if the current slot hasn't caught up. */
3201 break;
3202 }
3203
3204 Assert(restart_lsn >= wait_for_lsn);
3205
3207 min_restart_lsn > restart_lsn)
3208 min_restart_lsn = restart_lsn;
3209
3211
3212 name += strlen(name) + 1;
3213 }
3214
3216
3217 /*
3218 * Return false if not all the standbys have caught up to the specified
3219 * WAL location.
3220 */
3222 return false;
3223
3224 /* The ss_oldest_flush_lsn must not retreat. */
3227
3229
3230 return true;
3231}
3232
3233/*
3234 * Wait for physical standbys to confirm receiving the given lsn.
3235 *
3236 * Used by logical decoding SQL functions. It waits for physical standbys
3237 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3238 */
3239void
3241{
3242 /*
3243 * Don't need to wait for the standby to catch up if the current acquired
3244 * slot is not a logical failover slot, or there is no value in
3245 * synchronized_standby_slots.
3246 */
3248 return;
3249
3251
3252 for (;;)
3253 {
3255
3257 {
3258 ConfigReloadPending = false;
3260 }
3261
3262 /* Exit if done waiting for every slot. */
3264 break;
3265
3266 /*
3267 * Wait for the slots in the synchronized_standby_slots to catch up,
3268 * but use a timeout (1s) so we can also check if the
3269 * synchronized_standby_slots has been changed.
3270 */
3273 }
3274
3276}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1721
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1795
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define NameStr(name)
Definition c.h:765
#define ngettext(s, p, n)
Definition c.h:1186
#define Assert(condition)
Definition c.h:873
#define PG_BINARY
Definition c.h:1297
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:480
uint64_t uint64
Definition c.h:547
#define pg_unreachable()
Definition c.h:341
uint32_t uint32
Definition c.h:546
#define lengthof(array)
Definition c.h:803
#define MemSet(start, val, len)
Definition c.h:1023
#define StaticAssertDecl(condition, errmessage)
Definition c.h:938
uint32 TransactionId
Definition c.h:666
size_t Size
Definition c.h:619
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
int errmsg_internal(const char *fmt,...)
Definition elog.c:1171
int errdetail_internal(const char *fmt,...)
Definition elog.c:1244
int errcode_for_file_access(void)
Definition elog.c:887
int errdetail(const char *fmt,...)
Definition elog.c:1217
int errhint_internal(const char *fmt,...)
Definition elog.c:1353
int errhint(const char *fmt,...)
Definition elog.c:1331
int errcode(int sqlerrcode)
Definition elog.c:864
int errmsg(const char *fmt,...)
Definition elog.c:1081
#define _(x)
Definition elog.c:91
#define LOG
Definition elog.h:31
#define FATAL
Definition elog.h:41
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int MakePGDirectory(const char *directoryName)
Definition fd.c:3962
int FreeDir(DIR *dir)
Definition fd.c:3008
int CloseTransientFile(int fd)
Definition fd.c:2854
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:756
DIR * AllocateDir(const char *dirname)
Definition fd.c:2890
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2956
int pg_fsync(int fd)
Definition fd.c:389
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2677
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:121
ProcNumber MyProcNumber
Definition globals.c:90
bool IsUnderPostmaster
Definition globals.c:120
Oid MyDatabaseId
Definition globals.c:94
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6628
void * guc_malloc(int elevel, size_t size)
Definition guc.c:636
#define newval
#define GUC_check_errdetail
Definition guc.h:506
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:510
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1587
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:434
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1911
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1955
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
@ B_STARTUP
Definition miscadmin.h:365
#define END_CRIT_SECTION()
Definition miscadmin.h:152
Oid GetUserId(void)
Definition miscinit.c:469
BackendType MyBackendType
Definition miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition miscinit.c:688
void namestrcpy(Name name, const char *str)
Definition name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define GetPGProcByNumber(n)
Definition proc.h:461
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3954
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
Definition procarray.c:3458
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:574
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:621
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1965
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:113
char * synchronized_standby_slots
Definition slot.c:164
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3044
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:135
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2309
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:379
int idle_replication_slot_timeout_secs
Definition slot.c:158
void ReplicationSlotDropAcquired(void)
Definition slot.c:1034
void ReplicationSlotMarkDirty(void)
Definition slot.c:1176
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1873
void ReplicationSlotReserveWal(void)
Definition slot.c:1696
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1449
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:173
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:312
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1510
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:132
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1370
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2915
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1218
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2672
void ReplicationSlotPersist(void)
Definition slot.c:1193
bool CheckLogicalSlotExists(void)
Definition slot.c:1615
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1775
ReplicationSlot * MyReplicationSlot
Definition slot.c:148
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2509
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:912
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3059
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2954
void ReplicationSlotSave(void)
Definition slot.c:1158
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:541
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2448
#define ReplicationSlotOnDiskV2Size
Definition slot.c:138
void CheckSlotPermissions(void)
Definition slot.c:1679
bool ReplicationSlotName(int index, Name name)
Definition slot.c:590
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:2988
void ReplicationSlotsShmemInit(void)
Definition slot.c:206
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:267
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:952
void ReplicationSlotRelease(void)
Definition slot.c:761
int max_replication_slots
Definition slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:145
#define SLOT_VERSION
Definition slot.c:142
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3240
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3092
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1300
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:860
void ReplicationSlotInitialize(void)
Definition slot.c:242
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1051
void StartupReplicationSlots(void)
Definition slot.c:2387
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1857
void CheckSlotRequirements(void)
Definition slot.c:1657
#define SLOT_MAGIC
Definition slot.c:141
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2205
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:167
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:129
Size ReplicationSlotsShmemSize(void)
Definition slot.c:188
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2935
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:251
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:362
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:287
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1820
#define SpinLockInit(lock)
Definition spin.h:57
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
PGPROC * MyProc
Definition proc.c:67
PROC_HDR * ProcGlobal
Definition proc.c:70
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1282
@ RECOVERY_CONFLICT_LOGICALSLOT
Definition standby.h:43
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:269
int pgxactoff
Definition proc.h:202
uint8 * statusFlags
Definition proc.h:413
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotPersistentData slotdata
Definition slot.c:83
pg_crc32c checksum
Definition slot.c:72
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
const char * cause_name
Definition slot.c:110
ReplicationSlotInvalidationCause cause
Definition slot.c:109
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:101
ConditionVariable wal_confirm_rcv_cv
Definition type.h:96
Definition c.h:760
unsigned short st_mode
Definition win32_port.h:258
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2775
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
const char * name
bool am_walsender
Definition walsender.c:123
bool log_replication_commands
Definition walsender.c:133
WalSndCtlData * WalSndCtl
Definition walsender.c:117
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6460
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3795
bool EnableHotStandby
Definition xlog.c:124
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6563
int wal_level
Definition xlog.c:134
int wal_segment_size
Definition xlog.c:146
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2669
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9598
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
@ WAL_LEVEL_REPLICA
Definition xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)