PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "utils/builtins.h"
59#include "utils/guc_hooks.h"
61#include "utils/varlena.h"
62#include "utils/wait_event.h"
63
64/*
65 * Replication slot on-disk data structure.
66 */
68{
69 /* first part of this struct needs to be version independent */
70
71 /* data not covered by checksum */
74
75 /* data covered by checksum */
78
79 /*
80 * The actual data in the slot that follows can differ based on the above
81 * 'version'.
82 */
83
86
87/*
88 * Struct for the configuration of synchronized_standby_slots.
89 *
90 * Note: this must be a flat representation that can be held in a single chunk
91 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
92 * synchronized_standby_slots GUC.
93 */
94typedef struct
95{
96 /* Number of slot names in the slot_names[] */
98
99 /*
100 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
101 */
102 char slot_names[FLEXIBLE_ARRAY_MEMBER];
104
105/*
106 * Lookup table for slot invalidation causes.
107 */
113
115 {RS_INVAL_NONE, "none"},
116 {RS_INVAL_WAL_REMOVED, "wal_removed"},
117 {RS_INVAL_HORIZON, "rows_removed"},
118 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
119 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
120};
121
122/*
123 * Ensure that the lookup table is up-to-date with the enums defined in
124 * ReplicationSlotInvalidationCause.
125 */
127 "array length mismatch");
128
129/* size of version independent data */
130#define ReplicationSlotOnDiskConstantSize \
131 offsetof(ReplicationSlotOnDisk, slotdata)
132/* size of the part of the slot not covered by the checksum */
133#define ReplicationSlotOnDiskNotChecksummedSize \
134 offsetof(ReplicationSlotOnDisk, version)
135/* size of the part covered by the checksum */
136#define ReplicationSlotOnDiskChecksummedSize \
137 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
138/* size of the slot data that is version dependent */
139#define ReplicationSlotOnDiskV2Size \
140 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
141
142#define SLOT_MAGIC 0x1051CA1 /* format identifier */
143#define SLOT_VERSION 5 /* version for new files */
144
145/* Control array for replication slot management */
147
148/* My backend's replication slot in the shared memory array */
150
151/* GUC variables */
152int max_replication_slots = 10; /* the maximum number of replication
153 * slots */
154
155/*
156 * Invalidate replication slots that have remained idle longer than this
157 * duration; '0' disables it.
158 */
160
161/*
162 * This GUC lists streaming replication standby server slot names that
163 * logical WAL sender processes will wait for.
164 */
166
167/* This is the parsed and cached configuration for synchronized_standby_slots */
169
170/*
171 * Oldest LSN that has been confirmed to be flushed to the standbys
172 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
173 */
175
176static void ReplicationSlotShmemExit(int code, Datum arg);
177static bool IsSlotForConflictCheck(const char *name);
178static void ReplicationSlotDropPtr(ReplicationSlot *slot);
179
180/* internal persistency functions */
181static void RestoreSlotFromDisk(const char *name);
182static void CreateSlotOnDisk(ReplicationSlot *slot);
183static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
184
185/*
186 * Report shared-memory space needed by ReplicationSlotsShmemInit.
187 */
188Size
190{
191 Size size = 0;
192
193 if (max_replication_slots == 0)
194 return size;
195
196 size = offsetof(ReplicationSlotCtlData, replication_slots);
197 size = add_size(size,
199
200 return size;
201}
202
203/*
204 * Allocate and initialize shared memory for replication slots.
205 */
206void
208{
209 bool found;
210
211 if (max_replication_slots == 0)
212 return;
213
215 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
216 &found);
217
218 if (!found)
219 {
220 int i;
221
222 /* First time through, so initialize */
224
225 for (i = 0; i < max_replication_slots; i++)
226 {
228
229 /* everything else is zeroed by the memset above */
231 SpinLockInit(&slot->mutex);
235 }
236 }
237}
238
239/*
240 * Register the callback for replication slot cleanup and releasing.
241 */
242void
247
248/*
249 * Release and cleanup replication slots.
250 */
251static void
253{
254 /* Make sure active replication slots are released */
255 if (MyReplicationSlot != NULL)
257
258 /* Also cleanup all the temporary slots. */
260}
261
262/*
263 * Check whether the passed slot name is valid and report errors at elevel.
264 *
265 * See comments for ReplicationSlotValidateNameInternal().
266 */
267bool
269 int elevel)
270{
271 int err_code;
272 char *err_msg = NULL;
273 char *err_hint = NULL;
274
276 &err_code, &err_msg, &err_hint))
277 {
278 /*
279 * Use errmsg_internal() and errhint_internal() instead of errmsg()
280 * and errhint(), since the messages from
281 * ReplicationSlotValidateNameInternal() are already translated. This
282 * avoids double translation.
283 */
284 ereport(elevel,
286 errmsg_internal("%s", err_msg),
287 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
288
289 pfree(err_msg);
290 if (err_hint != NULL)
292 return false;
293 }
294
295 return true;
296}
297
298/*
299 * Check whether the passed slot name is valid.
300 *
301 * An error will be reported for a reserved replication slot name if
302 * allow_reserved_name is set to false.
303 *
304 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
305 * the name to be used as a directory name on every supported OS.
306 *
307 * Returns true if the slot name is valid. Otherwise, returns false and stores
308 * the error code, error message, and optional hint in err_code, err_msg, and
309 * err_hint, respectively. The caller is responsible for freeing err_msg and
310 * err_hint, which are palloc'd.
311 */
312bool
314 int *err_code, char **err_msg, char **err_hint)
315{
316 const char *cp;
317
318 if (strlen(name) == 0)
319 {
321 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
322 *err_hint = NULL;
323 return false;
324 }
325
326 if (strlen(name) >= NAMEDATALEN)
327 {
329 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
330 *err_hint = NULL;
331 return false;
332 }
333
334 for (cp = name; *cp; cp++)
335 {
336 if (!((*cp >= 'a' && *cp <= 'z')
337 || (*cp >= '0' && *cp <= '9')
338 || (*cp == '_')))
339 {
341 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
342 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
343 return false;
344 }
345 }
346
348 {
350 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
351 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
353 return false;
354 }
355
356 return true;
357}
358
359/*
360 * Return true if the replication slot name is "pg_conflict_detection".
361 */
362static bool
364{
365 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
366}
367
368/*
369 * Create a new replication slot and mark it as used by this backend.
370 *
371 * name: Name of the slot
372 * db_specific: logical decoding is db specific; if the slot is going to
373 * be used for that pass true, otherwise false.
374 * two_phase: If enabled, allows decoding of prepared transactions.
375 * failover: If enabled, allows the slot to be synced to standbys so
376 * that logical replication can be resumed after failover.
377 * synced: True if the slot is synchronized from the primary server.
378 */
379void
381 ReplicationSlotPersistency persistency,
382 bool two_phase, bool failover, bool synced)
383{
384 ReplicationSlot *slot = NULL;
385 int i;
386
388
389 /*
390 * The logical launcher or pg_upgrade may create or migrate an internal
391 * slot, so using a reserved name is allowed in these cases.
392 */
394 ERROR);
395
396 if (failover)
397 {
398 /*
399 * Do not allow users to create the failover enabled slots on the
400 * standby as we do not support sync to the cascading standby.
401 *
402 * However, failover enabled slots can be created during slot
403 * synchronization because we need to retain the same values as the
404 * remote slot.
405 */
409 errmsg("cannot enable failover for a replication slot created on the standby"));
410
411 /*
412 * Do not allow users to create failover enabled temporary slots,
413 * because temporary slots will not be synced to the standby.
414 *
415 * However, failover enabled temporary slots can be created during
416 * slot synchronization. See the comments atop slotsync.c for details.
417 */
418 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
421 errmsg("cannot enable failover for a temporary replication slot"));
422 }
423
424 /*
425 * If some other backend ran this code concurrently with us, we'd likely
426 * both allocate the same slot, and that would be bad. We'd also be at
427 * risk of missing a name collision. Also, we don't want to try to create
428 * a new slot while somebody's busy cleaning up an old one, because we
429 * might both be monkeying with the same directory.
430 */
432
433 /*
434 * Check for name collision, and identify an allocatable slot. We need to
435 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
436 * else can change the in_use flags while we're looking at them.
437 */
439 for (i = 0; i < max_replication_slots; i++)
440 {
442
443 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
446 errmsg("replication slot \"%s\" already exists", name)));
447 if (!s->in_use && slot == NULL)
448 slot = s;
449 }
451
452 /* If all slots are in use, we're out of luck. */
453 if (slot == NULL)
456 errmsg("all replication slots are in use"),
457 errhint("Free one or increase \"max_replication_slots\".")));
458
459 /*
460 * Since this slot is not in use, nobody should be looking at any part of
461 * it other than the in_use field unless they're trying to allocate it.
462 * And since we hold ReplicationSlotAllocationLock, nobody except us can
463 * be doing that. So it's safe to initialize the slot.
464 */
465 Assert(!slot->in_use);
467
468 /* first initialize persistent data */
469 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
470 namestrcpy(&slot->data.name, name);
472 slot->data.persistency = persistency;
473 slot->data.two_phase = two_phase;
475 slot->data.failover = failover;
476 slot->data.synced = synced;
477
478 /* and then data only present in shared memory */
479 slot->just_dirtied = false;
480 slot->dirty = false;
489 slot->inactive_since = 0;
491
492 /*
493 * Create the slot on disk. We haven't actually marked the slot allocated
494 * yet, so no special cleanup is required if this errors out.
495 */
496 CreateSlotOnDisk(slot);
497
498 /*
499 * We need to briefly prevent any other backend from iterating over the
500 * slots while we flip the in_use flag. We also need to set the active
501 * flag while holding the ControlLock as otherwise a concurrent
502 * ReplicationSlotAcquire() could acquire the slot as well.
503 */
505
506 slot->in_use = true;
507
508 /* We can now mark the slot active, and that makes it our slot. */
509 SpinLockAcquire(&slot->mutex);
512 SpinLockRelease(&slot->mutex);
513 MyReplicationSlot = slot;
514
516
517 /*
518 * Create statistics entry for the new logical slot. We don't collect any
519 * stats for physical slots, so no need to create an entry for the same.
520 * See ReplicationSlotDropPtr for why we need to do this before releasing
521 * ReplicationSlotAllocationLock.
522 */
523 if (SlotIsLogical(slot))
525
526 /*
527 * Now that the slot has been marked as in_use and active, it's safe to
528 * let somebody else try to allocate a slot.
529 */
531
532 /* Let everybody know we've modified this slot */
534}
535
536/*
537 * Search for the named replication slot.
538 *
539 * Return the replication slot if found, otherwise NULL.
540 */
543{
544 int i;
545 ReplicationSlot *slot = NULL;
546
547 if (need_lock)
549
550 for (i = 0; i < max_replication_slots; i++)
551 {
553
554 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
555 {
556 slot = s;
557 break;
558 }
559 }
560
561 if (need_lock)
563
564 return slot;
565}
566
567/*
568 * Return the index of the replication slot in
569 * ReplicationSlotCtl->replication_slots.
570 *
571 * This is mainly useful to have an efficient key for storing replication slot
572 * stats.
573 */
574int
582
583/*
584 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
585 * the slot's name and true is returned.
586 *
587 * This likely is only useful for pgstat_replslot.c during shutdown, in other
588 * cases there are obvious TOCTOU issues.
589 */
590bool
592{
593 ReplicationSlot *slot;
594 bool found;
595
597
598 /*
599 * Ensure that the slot cannot be dropped while we copy the name. Don't
600 * need the spinlock as the name of an existing slot cannot change.
601 */
603 found = slot->in_use;
604 if (slot->in_use)
607
608 return found;
609}
610
611/*
612 * Find a previously created slot and mark it as used by this process.
613 *
614 * An error is raised if nowait is true and the slot is currently in use. If
615 * nowait is false, we sleep until the slot is released by the owning process.
616 *
617 * An error is raised if error_if_invalid is true and the slot is found to
618 * be invalid. It should always be set to true, except when we are temporarily
619 * acquiring the slot and don't intend to change it.
620 */
621void
622ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
623{
625 ProcNumber active_proc;
626 int active_pid;
627
628 Assert(name != NULL);
629
630retry:
632
634
635 /* Check if the slot exists with the given name. */
637 if (s == NULL || !s->in_use)
638 {
640
643 errmsg("replication slot \"%s\" does not exist",
644 name)));
645 }
646
647 /*
648 * Do not allow users to acquire the reserved slot. This scenario may
649 * occur if the launcher that owns the slot has terminated unexpectedly
650 * due to an error, and a backend process attempts to reuse the slot.
651 */
655 errmsg("cannot acquire replication slot \"%s\"", name),
656 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
657
658 /*
659 * This is the slot we want; check if it's active under some other
660 * process. In single user mode, we don't need this check.
661 */
663 {
664 /*
665 * Get ready to sleep on the slot in case it is active. (We may end
666 * up not sleeping, but we don't want to do this while holding the
667 * spinlock.)
668 */
669 if (!nowait)
671
672 /*
673 * It is important to reset the inactive_since under spinlock here to
674 * avoid race conditions with slot invalidation. See comments related
675 * to inactive_since in InvalidatePossiblyObsoleteSlot.
676 */
680 active_proc = s->active_proc;
683 }
684 else
685 {
686 s->active_proc = active_proc = MyProcNumber;
688 }
689 active_pid = GetPGProcByNumber(active_proc)->pid;
691
692 /*
693 * If we found the slot but it's already active in another process, we
694 * wait until the owning process signals us that it's been released, or
695 * error out.
696 */
697 if (active_proc != MyProcNumber)
698 {
699 if (!nowait)
700 {
701 /* Wait here until we get signaled, and then restart */
705 goto retry;
706 }
707
710 errmsg("replication slot \"%s\" is active for PID %d",
711 NameStr(s->data.name), active_pid)));
712 }
713 else if (!nowait)
714 ConditionVariableCancelSleep(); /* no sleep needed after all */
715
716 /* We made this slot active, so it's ours now. */
718
719 /*
720 * We need to check for invalidation after making the slot ours to avoid
721 * the possible race condition with the checkpointer that can otherwise
722 * invalidate the slot immediately after the check.
723 */
727 errmsg("can no longer access replication slot \"%s\"",
728 NameStr(s->data.name)),
729 errdetail("This replication slot has been invalidated due to \"%s\".",
731
732 /* Let everybody know we've modified this slot */
734
735 /*
736 * The call to pgstat_acquire_replslot() protects against stats for a
737 * different slot, from before a restart or such, being present during
738 * pgstat_report_replslot().
739 */
740 if (SlotIsLogical(s))
742
743
744 if (am_walsender)
745 {
748 ? errmsg("acquired logical replication slot \"%s\"",
749 NameStr(s->data.name))
750 : errmsg("acquired physical replication slot \"%s\"",
751 NameStr(s->data.name)));
752 }
753}
754
755/*
756 * Release the replication slot that this backend considers to own.
757 *
758 * This or another backend can re-acquire the slot later.
759 * Resources this slot requires will be preserved.
760 */
761void
763{
765 char *slotname = NULL; /* keep compiler quiet */
766 bool is_logical;
767 TimestampTz now = 0;
768
769 Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
770
772
773 if (am_walsender)
774 slotname = pstrdup(NameStr(slot->data.name));
775
776 if (slot->data.persistency == RS_EPHEMERAL)
777 {
778 /*
779 * Delete the slot. There is no !PANIC case where this is allowed to
780 * fail, all that may happen is an incomplete cleanup of the on-disk
781 * data.
782 */
784
785 /*
786 * Request to disable logical decoding, even though this slot may not
787 * have been the last logical slot. The checkpointer will verify if
788 * logical decoding should actually be disabled.
789 */
790 if (is_logical)
792 }
793
794 /*
795 * If slot needed to temporarily restrain both data and catalog xmin to
796 * create the catalog snapshot, remove that temporary constraint.
797 * Snapshots can only be exported while the initial snapshot is still
798 * acquired.
799 */
800 if (!TransactionIdIsValid(slot->data.xmin) &&
802 {
803 SpinLockAcquire(&slot->mutex);
805 SpinLockRelease(&slot->mutex);
807 }
808
809 /*
810 * Set the time since the slot has become inactive. We get the current
811 * time beforehand to avoid system call while holding the spinlock.
812 */
814
815 if (slot->data.persistency == RS_PERSISTENT)
816 {
817 /*
818 * Mark persistent slot inactive. We're not freeing it, just
819 * disconnecting, but wake up others that may be waiting for it.
820 */
821 SpinLockAcquire(&slot->mutex);
824 SpinLockRelease(&slot->mutex);
826 }
827 else
829
831
832 /* might not have been set when we've been a plain slot */
837
838 if (am_walsender)
839 {
842 ? errmsg("released logical replication slot \"%s\"",
843 slotname)
844 : errmsg("released physical replication slot \"%s\"",
845 slotname));
846
847 pfree(slotname);
848 }
849}
850
851/*
852 * Cleanup temporary slots created in current session.
853 *
854 * Cleanup only synced temporary slots if 'synced_only' is true, else
855 * cleanup all temporary slots.
856 *
857 * If it drops the last logical slot in the cluster, requests to disable
858 * logical decoding.
859 */
860void
862{
863 int i;
865 bool dropped_logical = false;
866
868
869restart:
872 for (i = 0; i < max_replication_slots; i++)
873 {
875
876 if (!s->in_use)
877 continue;
878
880
883
884 if ((s->active_proc == MyProcNumber &&
885 (!synced_only || s->data.synced)))
886 {
889 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890
891 if (SlotIsLogical(s))
892 dropped_logical = true;
893
895
897 goto restart;
898 }
899 else
901 }
902
904
907}
908
909/*
910 * Permanently drop replication slot identified by the passed in name.
911 */
912void
913ReplicationSlotDrop(const char *name, bool nowait)
914{
915 bool is_logical;
916
918
919 ReplicationSlotAcquire(name, nowait, false);
920
921 /*
922 * Do not allow users to drop the slots which are currently being synced
923 * from the primary to the standby.
924 */
928 errmsg("cannot drop replication slot \"%s\"", name),
929 errdetail("This replication slot is being synchronized from the primary server."));
930
932
934
935 if (is_logical)
937}
938
939/*
940 * Change the definition of the slot identified by the specified name.
941 *
942 * Altering the two_phase property of a slot requires caution on the
943 * client-side. Enabling it at any random point during decoding has the
944 * risk that transactions prepared before this change may be skipped by
945 * the decoder, leading to missing prepare records on the client. So, we
946 * enable it for subscription related slots only once the initial tablesync
947 * is finished. See comments atop worker.c. Disabling it is safe only when
948 * there are no pending prepared transaction, otherwise, the changes of
949 * already prepared transactions can be replicated again along with their
950 * corresponding commit leading to duplicate data or errors.
951 */
952void
953ReplicationSlotAlter(const char *name, const bool *failover,
954 const bool *two_phase)
955{
956 bool update_slot = false;
957
960
961 ReplicationSlotAcquire(name, false, true);
962
966 errmsg("cannot use %s with a physical replication slot",
967 "ALTER_REPLICATION_SLOT"));
968
969 if (RecoveryInProgress())
970 {
971 /*
972 * Do not allow users to alter the slots which are currently being
973 * synced from the primary to the standby.
974 */
978 errmsg("cannot alter replication slot \"%s\"", name),
979 errdetail("This replication slot is being synchronized from the primary server."));
980
981 /*
982 * Do not allow users to enable failover on the standby as we do not
983 * support sync to the cascading standby.
984 */
985 if (failover && *failover)
988 errmsg("cannot enable failover for a replication slot"
989 " on the standby"));
990 }
991
992 if (failover)
993 {
994 /*
995 * Do not allow users to enable failover for temporary slots as we do
996 * not support syncing temporary slots to the standby.
997 */
1001 errmsg("cannot enable failover for a temporary replication slot"));
1002
1004 {
1008
1009 update_slot = true;
1010 }
1011 }
1012
1014 {
1018
1019 update_slot = true;
1020 }
1021
1022 if (update_slot)
1023 {
1026 }
1027
1029}
1030
1031/*
1032 * Permanently drop the currently acquired replication slot.
1033 */
1034void
1036{
1038
1040
1041 /* slot isn't acquired anymore */
1043
1045}
1046
1047/*
1048 * Permanently drop the replication slot which will be released by the point
1049 * this function returns.
1050 */
1051static void
1053{
1054 char path[MAXPGPATH];
1055 char tmppath[MAXPGPATH];
1056
1057 /*
1058 * If some other backend ran this code concurrently with us, we might try
1059 * to delete a slot with a certain name while someone else was trying to
1060 * create a slot with the same name.
1061 */
1063
1064 /* Generate pathnames. */
1065 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1066 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1067
1068 /*
1069 * Rename the slot directory on disk, so that we'll no longer recognize
1070 * this as a valid slot. Note that if this fails, we've got to mark the
1071 * slot inactive before bailing out. If we're dropping an ephemeral or a
1072 * temporary slot, we better never fail hard as the caller won't expect
1073 * the slot to survive and this might get called during error handling.
1074 */
1075 if (rename(path, tmppath) == 0)
1076 {
1077 /*
1078 * We need to fsync() the directory we just renamed and its parent to
1079 * make sure that our changes are on disk in a crash-safe fashion. If
1080 * fsync() fails, we can't be sure whether the changes are on disk or
1081 * not. For now, we handle that by panicking;
1082 * StartupReplicationSlots() will try to straighten it out after
1083 * restart.
1084 */
1086 fsync_fname(tmppath, true);
1089 }
1090 else
1091 {
1092 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1093
1094 SpinLockAcquire(&slot->mutex);
1096 SpinLockRelease(&slot->mutex);
1097
1098 /* wake up anyone waiting on this slot */
1100
1103 errmsg("could not rename file \"%s\" to \"%s\": %m",
1104 path, tmppath)));
1105 }
1106
1107 /*
1108 * The slot is definitely gone. Lock out concurrent scans of the array
1109 * long enough to kill it. It's OK to clear the active PID here without
1110 * grabbing the mutex because nobody else can be scanning the array here,
1111 * and nobody can be attached to this slot and thus access it without
1112 * scanning the array.
1113 *
1114 * Also wake up processes waiting for it.
1115 */
1118 slot->in_use = false;
1121
1122 /*
1123 * Slot is dead and doesn't prevent resource removal anymore, recompute
1124 * limits.
1125 */
1128
1129 /*
1130 * If removing the directory fails, the worst thing that will happen is
1131 * that the user won't be able to create a new slot with the same name
1132 * until the next server restart. We warn about it, but that's all.
1133 */
1134 if (!rmtree(tmppath, true))
1136 (errmsg("could not remove directory \"%s\"", tmppath)));
1137
1138 /*
1139 * Drop the statistics entry for the replication slot. Do this while
1140 * holding ReplicationSlotAllocationLock so that we don't drop a
1141 * statistics entry for another slot with the same name just created in
1142 * another session.
1143 */
1144 if (SlotIsLogical(slot))
1146
1147 /*
1148 * We release this at the very end, so that nobody starts trying to create
1149 * a slot while we're still cleaning up the detritus of the old one.
1150 */
1152}
1153
1154/*
1155 * Serialize the currently acquired slot's state from memory to disk, thereby
1156 * guaranteeing the current state will survive a crash.
1157 */
1158void
1160{
1161 char path[MAXPGPATH];
1162
1164
1167}
1168
1169/*
1170 * Signal that it would be useful if the currently acquired slot would be
1171 * flushed out to disk.
1172 *
1173 * Note that the actual flush to disk can be delayed for a long time, if
1174 * required for correctness explicitly do a ReplicationSlotSave().
1175 */
1176void
1178{
1180
1182
1183 SpinLockAcquire(&slot->mutex);
1185 MyReplicationSlot->dirty = true;
1186 SpinLockRelease(&slot->mutex);
1187}
1188
1189/*
1190 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1191 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1192 */
1193void
1195{
1197
1198 Assert(slot != NULL);
1200
1201 SpinLockAcquire(&slot->mutex);
1203 SpinLockRelease(&slot->mutex);
1204
1207}
1208
1209/*
1210 * Compute the oldest xmin across all slots and store it in the ProcArray.
1211 *
1212 * If already_locked is true, both the ReplicationSlotControlLock and the
1213 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1214 * caller first acquires the ReplicationSlotControlLock, followed by the
1215 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1216 * acquires them in that order.
1217 */
1218void
1220{
1221 int i;
1224
1229
1230 /*
1231 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1232 * values, so no backend updates the initial xmin for newly created slot
1233 * concurrently. A shared lock is used here to minimize lock contention,
1234 * especially when many slots exist and advancements occur frequently.
1235 * This is safe since an exclusive lock is taken during initial slot xmin
1236 * update in slot creation.
1237 *
1238 * One might think that we can hold the ProcArrayLock exclusively and
1239 * update the slot xmin values, but it could increase lock contention on
1240 * the ProcArrayLock, which is not great since this function can be called
1241 * at non-negligible frequency.
1242 *
1243 * Concurrent invocation of this function may cause the computed slot xmin
1244 * to regress. However, this is harmless because tuples prior to the most
1245 * recent xmin are no longer useful once advancement occurs (see
1246 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1247 * before updating the effective_xmin). Thus, such regression merely
1248 * prevents VACUUM from prematurely removing tuples without causing the
1249 * early deletion of required data.
1250 */
1251 if (!already_locked)
1253
1254 for (i = 0; i < max_replication_slots; i++)
1255 {
1257 TransactionId effective_xmin;
1258 TransactionId effective_catalog_xmin;
1259 bool invalidated;
1260
1261 if (!s->in_use)
1262 continue;
1263
1265 effective_xmin = s->effective_xmin;
1266 effective_catalog_xmin = s->effective_catalog_xmin;
1267 invalidated = s->data.invalidated != RS_INVAL_NONE;
1269
1270 /* invalidated slots need not apply */
1271 if (invalidated)
1272 continue;
1273
1274 /* check the data xmin */
1275 if (TransactionIdIsValid(effective_xmin) &&
1277 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1278 agg_xmin = effective_xmin;
1279
1280 /* check the catalog xmin */
1281 if (TransactionIdIsValid(effective_catalog_xmin) &&
1283 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1284 agg_catalog_xmin = effective_catalog_xmin;
1285 }
1286
1288
1289 if (!already_locked)
1291}
1292
1293/*
1294 * Compute the oldest restart LSN across all slots and inform xlog module.
1295 *
1296 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1297 * purpose, we don't try to account for that, because this module doesn't
1298 * know what to compare against.
1299 */
1300void
1302{
1303 int i;
1305
1307
1309 for (i = 0; i < max_replication_slots; i++)
1310 {
1312 XLogRecPtr restart_lsn;
1313 XLogRecPtr last_saved_restart_lsn;
1314 bool invalidated;
1315 ReplicationSlotPersistency persistency;
1316
1317 if (!s->in_use)
1318 continue;
1319
1321 persistency = s->data.persistency;
1322 restart_lsn = s->data.restart_lsn;
1323 invalidated = s->data.invalidated != RS_INVAL_NONE;
1324 last_saved_restart_lsn = s->last_saved_restart_lsn;
1326
1327 /* invalidated slots need not apply */
1328 if (invalidated)
1329 continue;
1330
1331 /*
1332 * For persistent slot use last_saved_restart_lsn to compute the
1333 * oldest LSN for removal of WAL segments. The segments between
1334 * last_saved_restart_lsn and restart_lsn might be needed by a
1335 * persistent slot in the case of database crash. Non-persistent
1336 * slots can't survive the database crash, so we don't care about
1337 * last_saved_restart_lsn for them.
1338 */
1339 if (persistency == RS_PERSISTENT)
1340 {
1341 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1342 restart_lsn > last_saved_restart_lsn)
1343 {
1344 restart_lsn = last_saved_restart_lsn;
1345 }
1346 }
1347
1348 if (XLogRecPtrIsValid(restart_lsn) &&
1350 restart_lsn < min_required))
1351 min_required = restart_lsn;
1352 }
1354
1356}
1357
1358/*
1359 * Compute the oldest WAL LSN required by *logical* decoding slots..
1360 *
1361 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1362 * slots exist.
1363 *
1364 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1365 * ignores physical replication slots.
1366 *
1367 * The results aren't required frequently, so we don't maintain a precomputed
1368 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1369 */
1372{
1374 int i;
1375
1376 if (max_replication_slots <= 0)
1377 return InvalidXLogRecPtr;
1378
1380
1381 for (i = 0; i < max_replication_slots; i++)
1382 {
1383 ReplicationSlot *s;
1384 XLogRecPtr restart_lsn;
1385 XLogRecPtr last_saved_restart_lsn;
1386 bool invalidated;
1387 ReplicationSlotPersistency persistency;
1388
1390
1391 /* cannot change while ReplicationSlotCtlLock is held */
1392 if (!s->in_use)
1393 continue;
1394
1395 /* we're only interested in logical slots */
1396 if (!SlotIsLogical(s))
1397 continue;
1398
1399 /* read once, it's ok if it increases while we're checking */
1401 persistency = s->data.persistency;
1402 restart_lsn = s->data.restart_lsn;
1403 invalidated = s->data.invalidated != RS_INVAL_NONE;
1404 last_saved_restart_lsn = s->last_saved_restart_lsn;
1406
1407 /* invalidated slots need not apply */
1408 if (invalidated)
1409 continue;
1410
1411 /*
1412 * For persistent slot use last_saved_restart_lsn to compute the
1413 * oldest LSN for removal of WAL segments. The segments between
1414 * last_saved_restart_lsn and restart_lsn might be needed by a
1415 * persistent slot in the case of database crash. Non-persistent
1416 * slots can't survive the database crash, so we don't care about
1417 * last_saved_restart_lsn for them.
1418 */
1419 if (persistency == RS_PERSISTENT)
1420 {
1421 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1422 restart_lsn > last_saved_restart_lsn)
1423 {
1424 restart_lsn = last_saved_restart_lsn;
1425 }
1426 }
1427
1428 if (!XLogRecPtrIsValid(restart_lsn))
1429 continue;
1430
1431 if (!XLogRecPtrIsValid(result) ||
1432 restart_lsn < result)
1433 result = restart_lsn;
1434 }
1435
1437
1438 return result;
1439}
1440
1441/*
1442 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1443 * passed database oid.
1444 *
1445 * Returns true if there are any slots referencing the database. *nslots will
1446 * be set to the absolute number of slots in the database, *nactive to ones
1447 * currently active.
1448 */
1449bool
1451{
1452 int i;
1453
1454 *nslots = *nactive = 0;
1455
1456 if (max_replication_slots <= 0)
1457 return false;
1458
1460 for (i = 0; i < max_replication_slots; i++)
1461 {
1462 ReplicationSlot *s;
1463
1465
1466 /* cannot change while ReplicationSlotCtlLock is held */
1467 if (!s->in_use)
1468 continue;
1469
1470 /* only logical slots are database specific, skip */
1471 if (!SlotIsLogical(s))
1472 continue;
1473
1474 /* not our database, skip */
1475 if (s->data.database != dboid)
1476 continue;
1477
1478 /* NB: intentionally counting invalidated slots */
1479
1480 /* count slots with spinlock held */
1482 (*nslots)++;
1484 (*nactive)++;
1486 }
1488
1489 if (*nslots > 0)
1490 return true;
1491 return false;
1492}
1493
1494/*
1495 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1496 * passed database oid. The caller should hold an exclusive lock on the
1497 * pg_database oid for the database to prevent creation of new slots on the db
1498 * or replay from existing slots.
1499 *
1500 * Another session that concurrently acquires an existing slot on the target DB
1501 * (most likely to drop it) may cause this function to ERROR. If that happens
1502 * it may have dropped some but not all slots.
1503 *
1504 * This routine isn't as efficient as it could be - but we don't drop
1505 * databases often, especially databases with lots of slots.
1506 *
1507 * If it drops the last logical slot in the cluster, it requests to disable
1508 * logical decoding.
1509 */
1510void
1512{
1513 int i;
1515 bool dropped = false;
1516
1517 if (max_replication_slots <= 0)
1518 return;
1519
1520restart:
1523 for (i = 0; i < max_replication_slots; i++)
1524 {
1525 ReplicationSlot *s;
1526 char *slotname;
1527 ProcNumber active_proc;
1528
1530
1531 /* cannot change while ReplicationSlotCtlLock is held */
1532 if (!s->in_use)
1533 continue;
1534
1535 /* only logical slots are database specific, skip */
1536 if (!SlotIsLogical(s))
1537 continue;
1538
1539 /*
1540 * Check logical slots on other databases too so we can disable
1541 * logical decoding only if no slots in the cluster.
1542 */
1546
1547 /* not our database, skip */
1548 if (s->data.database != dboid)
1549 continue;
1550
1551 /* NB: intentionally including invalidated slots to drop */
1552
1553 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1555 /* can't change while ReplicationSlotControlLock is held */
1556 slotname = NameStr(s->data.name);
1557 active_proc = s->active_proc;
1558 if (active_proc == INVALID_PROC_NUMBER)
1559 {
1562 }
1564
1565 /*
1566 * Even though we hold an exclusive lock on the database object a
1567 * logical slot for that DB can still be active, e.g. if it's
1568 * concurrently being dropped by a backend connected to another DB.
1569 *
1570 * That's fairly unlikely in practice, so we'll just bail out.
1571 *
1572 * The slot sync worker holds a shared lock on the database before
1573 * operating on synced logical slots to avoid conflict with the drop
1574 * happening here. The persistent synced slots are thus safe but there
1575 * is a possibility that the slot sync worker has created a temporary
1576 * slot (which stays active even on release) and we are trying to drop
1577 * that here. In practice, the chances of hitting this scenario are
1578 * less as during slot synchronization, the temporary slot is
1579 * immediately converted to persistent and thus is safe due to the
1580 * shared lock taken on the database. So, we'll just bail out in such
1581 * a case.
1582 *
1583 * XXX: We can consider shutting down the slot sync worker before
1584 * trying to drop synced temporary slots here.
1585 */
1586 if (active_proc != INVALID_PROC_NUMBER)
1587 ereport(ERROR,
1589 errmsg("replication slot \"%s\" is active for PID %d",
1590 slotname, GetPGProcByNumber(active_proc)->pid)));
1591
1592 /*
1593 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1594 * holding ReplicationSlotControlLock over filesystem operations,
1595 * release ReplicationSlotControlLock and use
1596 * ReplicationSlotDropAcquired.
1597 *
1598 * As that means the set of slots could change, restart scan from the
1599 * beginning each time we release the lock.
1600 */
1603 dropped = true;
1604 goto restart;
1605 }
1607
1608 if (dropped && !found_valid_logicalslot)
1610}
1611
1612/*
1613 * Returns true if there is at least one in-use valid logical replication slot.
1614 */
1615bool
1617{
1618 bool found = false;
1619
1620 if (max_replication_slots <= 0)
1621 return false;
1622
1624 for (int i = 0; i < max_replication_slots; i++)
1625 {
1626 ReplicationSlot *s;
1627 bool invalidated;
1628
1630
1631 /* cannot change while ReplicationSlotCtlLock is held */
1632 if (!s->in_use)
1633 continue;
1634
1635 if (SlotIsPhysical(s))
1636 continue;
1637
1639 invalidated = s->data.invalidated != RS_INVAL_NONE;
1641
1642 if (invalidated)
1643 continue;
1644
1645 found = true;
1646 break;
1647 }
1649
1650 return found;
1651}
1652
1653/*
1654 * Check whether the server's configuration supports using replication
1655 * slots.
1656 */
1657void
1659{
1660 /*
1661 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1662 * needs the same check.
1663 */
1664
1665 if (max_replication_slots == 0)
1666 ereport(ERROR,
1668 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1669
1671 ereport(ERROR,
1673 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1674}
1675
1676/*
1677 * Check whether the user has privilege to use replication slots.
1678 */
1679void
1681{
1683 ereport(ERROR,
1685 errmsg("permission denied to use replication slots"),
1686 errdetail("Only roles with the %s attribute may use replication slots.",
1687 "REPLICATION")));
1688}
1689
1690/*
1691 * Reserve WAL for the currently active slot.
1692 *
1693 * Compute and set restart_lsn in a manner that's appropriate for the type of
1694 * the slot and concurrency safe.
1695 */
1696void
1698{
1700 XLogSegNo segno;
1701 XLogRecPtr restart_lsn;
1702
1703 Assert(slot != NULL);
1706
1707 /*
1708 * The replication slot mechanism is used to prevent the removal of
1709 * required WAL.
1710 *
1711 * Acquire an exclusive lock to prevent the checkpoint process from
1712 * concurrently computing the minimum slot LSN (see
1713 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1714 * replication cannot be removed during a checkpoint.
1715 *
1716 * The mechanism is reliable because if WAL reservation occurs first, the
1717 * checkpoint must wait for the restart_lsn update before determining the
1718 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1719 * first, subsequent WAL reservations will select positions at or beyond
1720 * the redo pointer of that checkpoint.
1721 */
1723
1724 /*
1725 * For logical slots log a standby snapshot and start logical decoding at
1726 * exactly that position. That allows the slot to start up more quickly.
1727 * But on a standby we cannot do WAL writes, so just use the replay
1728 * pointer; effectively, an attempt to create a logical slot on standby
1729 * will cause it to wait for an xl_running_xact record to be logged
1730 * independently on the primary, so that a snapshot can be built using the
1731 * record.
1732 *
1733 * None of this is needed (or indeed helpful) for physical slots as
1734 * they'll start replay at the last logged checkpoint anyway. Instead,
1735 * return the location of the last redo LSN, where a base backup has to
1736 * start replay at.
1737 */
1738 if (SlotIsPhysical(slot))
1739 restart_lsn = GetRedoRecPtr();
1740 else if (RecoveryInProgress())
1741 restart_lsn = GetXLogReplayRecPtr(NULL);
1742 else
1743 restart_lsn = GetXLogInsertRecPtr();
1744
1745 SpinLockAcquire(&slot->mutex);
1746 slot->data.restart_lsn = restart_lsn;
1747 SpinLockRelease(&slot->mutex);
1748
1749 /* prevent WAL removal as fast as possible */
1751
1752 /* Checkpoint shouldn't remove the required WAL. */
1754 if (XLogGetLastRemovedSegno() >= segno)
1755 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1756 NameStr(slot->data.name));
1757
1759
1760 if (!RecoveryInProgress() && SlotIsLogical(slot))
1761 {
1763
1764 /* make sure we have enough information to start */
1766
1767 /* and make sure it's fsynced to disk */
1769 }
1770}
1771
1772/*
1773 * Report that replication slot needs to be invalidated
1774 */
1775static void
1777 bool terminating,
1778 int pid,
1779 NameData slotname,
1780 XLogRecPtr restart_lsn,
1782 TransactionId snapshotConflictHorizon,
1783 long slot_idle_seconds)
1784{
1787
1790
1791 switch (cause)
1792 {
1794 {
1795 uint64 ex = oldestLSN - restart_lsn;
1796
1798 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1799 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1800 ex),
1801 LSN_FORMAT_ARGS(restart_lsn),
1802 ex);
1803 /* translator: %s is a GUC variable name */
1804 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1805 "max_slot_wal_keep_size");
1806 break;
1807 }
1808 case RS_INVAL_HORIZON:
1809 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1810 snapshotConflictHorizon);
1811 break;
1812
1813 case RS_INVAL_WAL_LEVEL:
1814 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1815 break;
1816
1818 {
1819 /* translator: %s is a GUC variable name */
1820 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1821 slot_idle_seconds, "idle_replication_slot_timeout",
1823 /* translator: %s is a GUC variable name */
1824 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1825 "idle_replication_slot_timeout");
1826 break;
1827 }
1828 case RS_INVAL_NONE:
1830 }
1831
1832 ereport(LOG,
1833 terminating ?
1834 errmsg("terminating process %d to release replication slot \"%s\"",
1835 pid, NameStr(slotname)) :
1836 errmsg("invalidating obsolete replication slot \"%s\"",
1837 NameStr(slotname)),
1838 errdetail_internal("%s", err_detail.data),
1839 err_hint.len ? errhint("%s", err_hint.data) : 0);
1840
1841 pfree(err_detail.data);
1842 pfree(err_hint.data);
1843}
1844
1845/*
1846 * Can we invalidate an idle replication slot?
1847 *
1848 * Idle timeout invalidation is allowed only when:
1849 *
1850 * 1. Idle timeout is set
1851 * 2. Slot has reserved WAL
1852 * 3. Slot is inactive
1853 * 4. The slot is not being synced from the primary while the server is in
1854 * recovery. This is because synced slots are always considered to be
1855 * inactive because they don't perform logical decoding to produce changes.
1856 */
1857static inline bool
1865
1866/*
1867 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1868 * becomes invalid among the given possible causes.
1869 *
1870 * This function sequentially checks all possible invalidation causes and
1871 * returns the first one for which the slot is eligible for invalidation.
1872 */
1875 XLogRecPtr oldestLSN, Oid dboid,
1876 TransactionId snapshotConflictHorizon,
1877 TimestampTz *inactive_since, TimestampTz now)
1878{
1880
1882 {
1883 XLogRecPtr restart_lsn = s->data.restart_lsn;
1884
1885 if (XLogRecPtrIsValid(restart_lsn) &&
1886 restart_lsn < oldestLSN)
1887 return RS_INVAL_WAL_REMOVED;
1888 }
1889
1891 {
1892 /* invalid DB oid signals a shared relation */
1893 if (SlotIsLogical(s) &&
1894 (dboid == InvalidOid || dboid == s->data.database))
1895 {
1896 TransactionId effective_xmin = s->effective_xmin;
1898
1899 if (TransactionIdIsValid(effective_xmin) &&
1900 TransactionIdPrecedesOrEquals(effective_xmin,
1901 snapshotConflictHorizon))
1902 return RS_INVAL_HORIZON;
1905 snapshotConflictHorizon))
1906 return RS_INVAL_HORIZON;
1907 }
1908 }
1909
1911 {
1912 if (SlotIsLogical(s))
1913 return RS_INVAL_WAL_LEVEL;
1914 }
1915
1917 {
1918 Assert(now > 0);
1919
1920 if (CanInvalidateIdleSlot(s))
1921 {
1922 /*
1923 * Simulate the invalidation due to idle_timeout to test the
1924 * timeout behavior promptly, without waiting for it to trigger
1925 * naturally.
1926 */
1927#ifdef USE_INJECTION_POINTS
1928 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1929 {
1930 *inactive_since = 0; /* since the beginning of time */
1931 return RS_INVAL_IDLE_TIMEOUT;
1932 }
1933#endif
1934
1935 /*
1936 * Check if the slot needs to be invalidated due to
1937 * idle_replication_slot_timeout GUC.
1938 */
1941 {
1942 *inactive_since = s->inactive_since;
1943 return RS_INVAL_IDLE_TIMEOUT;
1944 }
1945 }
1946 }
1947
1948 return RS_INVAL_NONE;
1949}
1950
1951/*
1952 * Helper for InvalidateObsoleteReplicationSlots
1953 *
1954 * Acquires the given slot and mark it invalid, if necessary and possible.
1955 *
1956 * Returns true if the slot was invalidated.
1957 *
1958 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1959 * interim (and in that case we're not holding the lock at return, otherwise
1960 * we are).
1961 *
1962 * This is inherently racy, because we release the LWLock
1963 * for syscalls, so caller must restart if we return true.
1964 */
1965static bool
1967 ReplicationSlot *s,
1969 Oid dboid, TransactionId snapshotConflictHorizon,
1970 bool *released_lock_out)
1971{
1972 int last_signaled_pid = 0;
1973 bool released_lock = false;
1974 bool invalidated = false;
1975 TimestampTz inactive_since = 0;
1976
1977 for (;;)
1978 {
1979 XLogRecPtr restart_lsn;
1980 NameData slotname;
1981 ProcNumber active_proc;
1982 int active_pid = 0;
1984 TimestampTz now = 0;
1985 long slot_idle_secs = 0;
1986
1988
1989 if (!s->in_use)
1990 {
1991 if (released_lock)
1993 break;
1994 }
1995
1997 {
1998 /*
1999 * Assign the current time here to avoid system call overhead
2000 * while holding the spinlock in subsequent code.
2001 */
2003 }
2004
2005 /*
2006 * Check if the slot needs to be invalidated. If it needs to be
2007 * invalidated, and is not currently acquired, acquire it and mark it
2008 * as having been invalidated. We do this with the spinlock held to
2009 * avoid race conditions -- for example the restart_lsn could move
2010 * forward, or the slot could be dropped.
2011 */
2013
2014 restart_lsn = s->data.restart_lsn;
2015
2016 /* we do nothing if the slot is already invalid */
2017 if (s->data.invalidated == RS_INVAL_NONE)
2019 s, oldestLSN,
2020 dboid,
2021 snapshotConflictHorizon,
2022 &inactive_since,
2023 now);
2024
2025 /* if there's no invalidation, we're done */
2027 {
2029 if (released_lock)
2031 break;
2032 }
2033
2034 slotname = s->data.name;
2035 active_proc = s->active_proc;
2036
2037 /*
2038 * If the slot can be acquired, do so and mark it invalidated
2039 * immediately. Otherwise we'll signal the owning process, below, and
2040 * retry.
2041 *
2042 * Note: Unlike other slot attributes, slot's inactive_since can't be
2043 * changed until the acquired slot is released or the owning process
2044 * is terminated. So, the inactive slot can only be invalidated
2045 * immediately without being terminated.
2046 */
2047 if (active_proc == INVALID_PROC_NUMBER)
2048 {
2052
2053 /*
2054 * XXX: We should consider not overwriting restart_lsn and instead
2055 * just rely on .invalidated.
2056 */
2058 {
2061 }
2062
2063 /* Let caller know */
2064 invalidated = true;
2065 }
2066 else
2067 {
2068 active_pid = GetPGProcByNumber(active_proc)->pid;
2069 Assert(active_pid != 0);
2070 }
2071
2073
2074 /*
2075 * Calculate the idle time duration of the slot if slot is marked
2076 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2077 */
2079 {
2080 int slot_idle_usecs;
2081
2082 TimestampDifference(inactive_since, now, &slot_idle_secs,
2084 }
2085
2086 if (active_proc != INVALID_PROC_NUMBER)
2087 {
2088 /*
2089 * Prepare the sleep on the slot's condition variable before
2090 * releasing the lock, to close a possible race condition if the
2091 * slot is released before the sleep below.
2092 */
2094
2096 released_lock = true;
2097
2098 /*
2099 * Signal to terminate the process that owns the slot, if we
2100 * haven't already signalled it. (Avoidance of repeated
2101 * signalling is the only reason for there to be a loop in this
2102 * routine; otherwise we could rely on caller's restart loop.)
2103 *
2104 * There is the race condition that other process may own the slot
2105 * after its current owner process is terminated and before this
2106 * process owns it. To handle that, we signal only if the PID of
2107 * the owning process has changed from the previous time. (This
2108 * logic assumes that the same PID is not reused very quickly.)
2109 */
2111 {
2113 slotname, restart_lsn,
2114 oldestLSN, snapshotConflictHorizon,
2116
2117 if (MyBackendType == B_STARTUP)
2119 active_pid,
2121 else
2122 (void) kill(active_pid, SIGTERM);
2123
2125 }
2126
2127 /* Wait until the slot is released. */
2130
2131 /*
2132 * Re-acquire lock and start over; we expect to invalidate the
2133 * slot next time (unless another process acquires the slot in the
2134 * meantime).
2135 *
2136 * Note: It is possible for a slot to advance its restart_lsn or
2137 * xmin values sufficiently between when we release the mutex and
2138 * when we recheck, moving from a conflicting state to a non
2139 * conflicting state. This is intentional and safe: if the slot
2140 * has caught up while we're busy here, the resources we were
2141 * concerned about (WAL segments or tuples) have not yet been
2142 * removed, and there's no reason to invalidate the slot.
2143 */
2145 continue;
2146 }
2147 else
2148 {
2149 /*
2150 * We hold the slot now and have already invalidated it; flush it
2151 * to ensure that state persists.
2152 *
2153 * Don't want to hold ReplicationSlotControlLock across file
2154 * system operations, so release it now but be sure to tell caller
2155 * to restart from scratch.
2156 */
2158 released_lock = true;
2159
2160 /* Make sure the invalidated state persists across server restart */
2164
2166 slotname, restart_lsn,
2167 oldestLSN, snapshotConflictHorizon,
2169
2170 /* done with this slot for now */
2171 break;
2172 }
2173 }
2174
2176
2178 return invalidated;
2179}
2180
2181/*
2182 * Invalidate slots that require resources about to be removed.
2183 *
2184 * Returns true when any slot have got invalidated.
2185 *
2186 * Whether a slot needs to be invalidated depends on the invalidation cause.
2187 * A slot is invalidated if it:
2188 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2189 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2190 * db; dboid may be InvalidOid for shared relations
2191 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2192 * logical.
2193 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2194 * "idle_replication_slot_timeout" duration.
2195 *
2196 * Note: This function attempts to invalidate the slot for multiple possible
2197 * causes in a single pass, minimizing redundant iterations. The "cause"
2198 * parameter can be a MASK representing one or more of the defined causes.
2199 *
2200 * If it invalidates the last logical slot in the cluster, it requests to
2201 * disable logical decoding.
2202 *
2203 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2204 */
2205bool
2207 XLogSegNo oldestSegno, Oid dboid,
2208 TransactionId snapshotConflictHorizon)
2209{
2211 bool invalidated = false;
2212 bool invalidated_logical = false;
2214
2215 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2218
2219 if (max_replication_slots == 0)
2220 return invalidated;
2221
2223
2224restart:
2227 for (int i = 0; i < max_replication_slots; i++)
2228 {
2230 bool released_lock = false;
2231
2232 if (!s->in_use)
2233 continue;
2234
2235 /* Prevent invalidation of logical slots during binary upgrade */
2237 {
2241
2242 continue;
2243 }
2244
2246 dboid, snapshotConflictHorizon,
2247 &released_lock))
2248 {
2250
2251 /* Remember we have invalidated a physical or logical slot */
2252 invalidated = true;
2253
2254 /*
2255 * Additionally, remember we have invalidated a logical slot as we
2256 * can request disabling logical decoding later.
2257 */
2258 if (SlotIsLogical(s))
2259 invalidated_logical = true;
2260 }
2261 else
2262 {
2263 /*
2264 * We need to check if the slot is invalidated here since
2265 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2266 * is already invalidated.
2267 */
2272 }
2273
2274 /* if the lock was released, start from scratch */
2275 if (released_lock)
2276 goto restart;
2277 }
2279
2280 /*
2281 * If any slots have been invalidated, recalculate the resource limits.
2282 */
2283 if (invalidated)
2284 {
2287 }
2288
2289 /*
2290 * Request the checkpointer to disable logical decoding if no valid
2291 * logical slots remain. If called by the checkpointer during a
2292 * checkpoint, only the request is initiated; actual deactivation is
2293 * deferred until after the checkpoint completes.
2294 */
2297
2298 return invalidated;
2299}
2300
2301/*
2302 * Flush all replication slots to disk.
2303 *
2304 * It is convenient to flush dirty replication slots at the time of checkpoint.
2305 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2306 * for which the confirmed_flush LSN has been updated since the last time it
2307 * was saved and flush them.
2308 */
2309void
2311{
2312 int i;
2313 bool last_saved_restart_lsn_updated = false;
2314
2315 elog(DEBUG1, "performing replication slot checkpoint");
2316
2317 /*
2318 * Prevent any slot from being created/dropped while we're active. As we
2319 * explicitly do *not* want to block iterating over replication_slots or
2320 * acquiring a slot we cannot take the control lock - but that's OK,
2321 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2322 * enough to guarantee that nobody can change the in_use bits on us.
2323 *
2324 * Additionally, acquiring the Allocation lock is necessary to serialize
2325 * the slot flush process with concurrent slot WAL reservation. This
2326 * ensures that the WAL position being reserved is either flushed to disk
2327 * or is beyond or equal to the redo pointer of the current checkpoint
2328 * (See ReplicationSlotReserveWal for details).
2329 */
2331
2332 for (i = 0; i < max_replication_slots; i++)
2333 {
2335 char path[MAXPGPATH];
2336
2337 if (!s->in_use)
2338 continue;
2339
2340 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2341 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2342
2343 /*
2344 * Slot's data is not flushed each time the confirmed_flush LSN is
2345 * updated as that could lead to frequent writes. However, we decide
2346 * to force a flush of all logical slot's data at the time of shutdown
2347 * if the confirmed_flush LSN is changed since we last flushed it to
2348 * disk. This helps in avoiding an unnecessary retreat of the
2349 * confirmed_flush LSN after restart.
2350 */
2351 if (is_shutdown && SlotIsLogical(s))
2352 {
2354
2355 if (s->data.invalidated == RS_INVAL_NONE &&
2357 {
2358 s->just_dirtied = true;
2359 s->dirty = true;
2360 }
2362 }
2363
2364 /*
2365 * Track if we're going to update slot's last_saved_restart_lsn. We
2366 * need this to know if we need to recompute the required LSN.
2367 */
2370
2371 SaveSlotToPath(s, path, LOG);
2372 }
2374
2375 /*
2376 * Recompute the required LSN if SaveSlotToPath() updated
2377 * last_saved_restart_lsn for any slot.
2378 */
2381}
2382
2383/*
2384 * Load all replication slots from disk into memory at server startup. This
2385 * needs to be run before we start crash recovery.
2386 */
2387void
2389{
2391 struct dirent *replication_de;
2392
2393 elog(DEBUG1, "starting up replication slots");
2394
2395 /* restore all slots by iterating over all on-disk entries */
2398 {
2399 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2401
2402 if (strcmp(replication_de->d_name, ".") == 0 ||
2403 strcmp(replication_de->d_name, "..") == 0)
2404 continue;
2405
2406 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2408
2409 /* we're only creating directories here, skip if it's not our's */
2411 continue;
2412
2413 /* we crashed while a slot was being setup or deleted, clean up */
2414 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2415 {
2416 if (!rmtree(path, true))
2417 {
2419 (errmsg("could not remove directory \"%s\"",
2420 path)));
2421 continue;
2422 }
2424 continue;
2425 }
2426
2427 /* looks like a slot in a normal state, restore */
2429 }
2431
2432 /* currently no slots exist, we're done. */
2433 if (max_replication_slots <= 0)
2434 return;
2435
2436 /* Now that we have recovered all the data, compute replication xmin */
2439}
2440
2441/* ----
2442 * Manipulation of on-disk state of replication slots
2443 *
2444 * NB: none of the routines below should take any notice whether a slot is the
2445 * current one or not, that's all handled a layer above.
2446 * ----
2447 */
2448static void
2450{
2451 char tmppath[MAXPGPATH];
2452 char path[MAXPGPATH];
2453 struct stat st;
2454
2455 /*
2456 * No need to take out the io_in_progress_lock, nobody else can see this
2457 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2458 * takes out the lock, if we'd take the lock here, we'd deadlock.
2459 */
2460
2461 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2462 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2463
2464 /*
2465 * It's just barely possible that some previous effort to create or drop a
2466 * slot with this name left a temp directory lying around. If that seems
2467 * to be the case, try to remove it. If the rmtree() fails, we'll error
2468 * out at the MakePGDirectory() below, so we don't bother checking
2469 * success.
2470 */
2471 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2472 rmtree(tmppath, true);
2473
2474 /* Create and fsync the temporary slot directory. */
2475 if (MakePGDirectory(tmppath) < 0)
2476 ereport(ERROR,
2478 errmsg("could not create directory \"%s\": %m",
2479 tmppath)));
2480 fsync_fname(tmppath, true);
2481
2482 /* Write the actual state file. */
2483 slot->dirty = true; /* signal that we really need to write */
2485
2486 /* Rename the directory into place. */
2487 if (rename(tmppath, path) != 0)
2488 ereport(ERROR,
2490 errmsg("could not rename file \"%s\" to \"%s\": %m",
2491 tmppath, path)));
2492
2493 /*
2494 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2495 * would persist after an OS crash or not - so, force a restart. The
2496 * restart would try to fsync this again till it works.
2497 */
2499
2500 fsync_fname(path, true);
2502
2504}
2505
2506/*
2507 * Shared functionality between saving and creating a replication slot.
2508 */
2509static void
2510SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2511{
2512 char tmppath[MAXPGPATH];
2513 char path[MAXPGPATH];
2514 int fd;
2516 bool was_dirty;
2517
2518 /* first check whether there's something to write out */
2519 SpinLockAcquire(&slot->mutex);
2520 was_dirty = slot->dirty;
2521 slot->just_dirtied = false;
2522 SpinLockRelease(&slot->mutex);
2523
2524 /* and don't do anything if there's nothing to write */
2525 if (!was_dirty)
2526 return;
2527
2529
2530 /* silence valgrind :( */
2531 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2532
2533 sprintf(tmppath, "%s/state.tmp", dir);
2534 sprintf(path, "%s/state", dir);
2535
2537 if (fd < 0)
2538 {
2539 /*
2540 * If not an ERROR, then release the lock before returning. In case
2541 * of an ERROR, the error recovery path automatically releases the
2542 * lock, but no harm in explicitly releasing even in that case. Note
2543 * that LWLockRelease() could affect errno.
2544 */
2545 int save_errno = errno;
2546
2548 errno = save_errno;
2549 ereport(elevel,
2551 errmsg("could not create file \"%s\": %m",
2552 tmppath)));
2553 return;
2554 }
2555
2556 cp.magic = SLOT_MAGIC;
2557 INIT_CRC32C(cp.checksum);
2558 cp.version = SLOT_VERSION;
2560
2561 SpinLockAcquire(&slot->mutex);
2562
2563 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2564
2565 SpinLockRelease(&slot->mutex);
2566
2567 COMP_CRC32C(cp.checksum,
2570 FIN_CRC32C(cp.checksum);
2571
2572 errno = 0;
2574 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2575 {
2576 int save_errno = errno;
2577
2580 unlink(tmppath);
2582
2583 /* if write didn't set errno, assume problem is no disk space */
2585 ereport(elevel,
2587 errmsg("could not write to file \"%s\": %m",
2588 tmppath)));
2589 return;
2590 }
2592
2593 /* fsync the temporary file */
2595 if (pg_fsync(fd) != 0)
2596 {
2597 int save_errno = errno;
2598
2601 unlink(tmppath);
2603
2604 errno = save_errno;
2605 ereport(elevel,
2607 errmsg("could not fsync file \"%s\": %m",
2608 tmppath)));
2609 return;
2610 }
2612
2613 if (CloseTransientFile(fd) != 0)
2614 {
2615 int save_errno = errno;
2616
2617 unlink(tmppath);
2619
2620 errno = save_errno;
2621 ereport(elevel,
2623 errmsg("could not close file \"%s\": %m",
2624 tmppath)));
2625 return;
2626 }
2627
2628 /* rename to permanent file, fsync file and directory */
2629 if (rename(tmppath, path) != 0)
2630 {
2631 int save_errno = errno;
2632
2633 unlink(tmppath);
2635
2636 errno = save_errno;
2637 ereport(elevel,
2639 errmsg("could not rename file \"%s\" to \"%s\": %m",
2640 tmppath, path)));
2641 return;
2642 }
2643
2644 /*
2645 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2646 */
2648
2649 fsync_fname(path, false);
2650 fsync_fname(dir, true);
2652
2654
2655 /*
2656 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2657 * already and remember the confirmed_flush LSN value.
2658 */
2659 SpinLockAcquire(&slot->mutex);
2660 if (!slot->just_dirtied)
2661 slot->dirty = false;
2662 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2663 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2664 SpinLockRelease(&slot->mutex);
2665
2667}
2668
2669/*
2670 * Load a single slot from disk into memory.
2671 */
2672static void
2674{
2676 int i;
2677 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2678 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2679 int fd;
2680 bool restored = false;
2681 int readBytes;
2682 pg_crc32c checksum;
2683 TimestampTz now = 0;
2684
2685 /* no need to lock here, no concurrent access allowed yet */
2686
2687 /* delete temp file if it exists */
2688 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2689 sprintf(path, "%s/state.tmp", slotdir);
2690 if (unlink(path) < 0 && errno != ENOENT)
2691 ereport(PANIC,
2693 errmsg("could not remove file \"%s\": %m", path)));
2694
2695 sprintf(path, "%s/state", slotdir);
2696
2697 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2698
2699 /* on some operating systems fsyncing a file requires O_RDWR */
2701
2702 /*
2703 * We do not need to handle this as we are rename()ing the directory into
2704 * place only after we fsync()ed the state file.
2705 */
2706 if (fd < 0)
2707 ereport(PANIC,
2709 errmsg("could not open file \"%s\": %m", path)));
2710
2711 /*
2712 * Sync state file before we're reading from it. We might have crashed
2713 * while it wasn't synced yet and we shouldn't continue on that basis.
2714 */
2716 if (pg_fsync(fd) != 0)
2717 ereport(PANIC,
2719 errmsg("could not fsync file \"%s\": %m",
2720 path)));
2722
2723 /* Also sync the parent directory */
2725 fsync_fname(slotdir, true);
2727
2728 /* read part of statefile that's guaranteed to be version independent */
2733 {
2734 if (readBytes < 0)
2735 ereport(PANIC,
2737 errmsg("could not read file \"%s\": %m", path)));
2738 else
2739 ereport(PANIC,
2741 errmsg("could not read file \"%s\": read %d of %zu",
2742 path, readBytes,
2744 }
2745
2746 /* verify magic */
2747 if (cp.magic != SLOT_MAGIC)
2748 ereport(PANIC,
2750 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2751 path, cp.magic, SLOT_MAGIC)));
2752
2753 /* verify version */
2754 if (cp.version != SLOT_VERSION)
2755 ereport(PANIC,
2757 errmsg("replication slot file \"%s\" has unsupported version %u",
2758 path, cp.version)));
2759
2760 /* boundary check on length */
2761 if (cp.length != ReplicationSlotOnDiskV2Size)
2762 ereport(PANIC,
2764 errmsg("replication slot file \"%s\" has corrupted length %u",
2765 path, cp.length)));
2766
2767 /* Now that we know the size, read the entire file */
2769 readBytes = read(fd,
2771 cp.length);
2773 if (readBytes != cp.length)
2774 {
2775 if (readBytes < 0)
2776 ereport(PANIC,
2778 errmsg("could not read file \"%s\": %m", path)));
2779 else
2780 ereport(PANIC,
2782 errmsg("could not read file \"%s\": read %d of %zu",
2783 path, readBytes, (Size) cp.length)));
2784 }
2785
2786 if (CloseTransientFile(fd) != 0)
2787 ereport(PANIC,
2789 errmsg("could not close file \"%s\": %m", path)));
2790
2791 /* now verify the CRC */
2792 INIT_CRC32C(checksum);
2793 COMP_CRC32C(checksum,
2796 FIN_CRC32C(checksum);
2797
2798 if (!EQ_CRC32C(checksum, cp.checksum))
2799 ereport(PANIC,
2800 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2801 path, checksum, cp.checksum)));
2802
2803 /*
2804 * If we crashed with an ephemeral slot active, don't restore but delete
2805 * it.
2806 */
2807 if (cp.slotdata.persistency != RS_PERSISTENT)
2808 {
2809 if (!rmtree(slotdir, true))
2810 {
2812 (errmsg("could not remove directory \"%s\"",
2813 slotdir)));
2814 }
2816 return;
2817 }
2818
2819 /*
2820 * Verify that requirements for the specific slot type are met. That's
2821 * important because if these aren't met we're not guaranteed to retain
2822 * all the necessary resources for the slot.
2823 *
2824 * NB: We have to do so *after* the above checks for ephemeral slots,
2825 * because otherwise a slot that shouldn't exist anymore could prevent
2826 * restarts.
2827 *
2828 * NB: Changing the requirements here also requires adapting
2829 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2830 */
2831 if (cp.slotdata.database != InvalidOid)
2832 {
2834 ereport(FATAL,
2836 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2837 NameStr(cp.slotdata.name)),
2838 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2839
2840 /*
2841 * In standby mode, the hot standby must be enabled. This check is
2842 * necessary to ensure logical slots are invalidated when they become
2843 * incompatible due to insufficient wal_level. Otherwise, if the
2844 * primary reduces effective_wal_level < logical while hot standby is
2845 * disabled, primary disable logical decoding while hot standby is
2846 * disabled, logical slots would remain valid even after promotion.
2847 */
2849 ereport(FATAL,
2851 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2852 NameStr(cp.slotdata.name)),
2853 errhint("Change \"hot_standby\" to be \"on\".")));
2854 }
2855 else if (wal_level < WAL_LEVEL_REPLICA)
2856 ereport(FATAL,
2858 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2859 NameStr(cp.slotdata.name)),
2860 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2861
2862 /* nothing can be active yet, don't lock anything */
2863 for (i = 0; i < max_replication_slots; i++)
2864 {
2865 ReplicationSlot *slot;
2866
2868
2869 if (slot->in_use)
2870 continue;
2871
2872 /* restore the entire set of persistent data */
2873 memcpy(&slot->data, &cp.slotdata,
2875
2876 /* initialize in memory state */
2877 slot->effective_xmin = cp.slotdata.xmin;
2878 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2879 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2880 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2881
2886
2887 slot->in_use = true;
2889
2890 /*
2891 * Set the time since the slot has become inactive after loading the
2892 * slot from the disk into memory. Whoever acquires the slot i.e.
2893 * makes the slot active will reset it. Use the same inactive_since
2894 * time for all the slots.
2895 */
2896 if (now == 0)
2898
2900
2901 restored = true;
2902 break;
2903 }
2904
2905 if (!restored)
2906 ereport(FATAL,
2907 (errmsg("too many replication slots active before shutdown"),
2908 errhint("Increase \"max_replication_slots\" and try again.")));
2909}
2910
2911/*
2912 * Maps an invalidation reason for a replication slot to
2913 * ReplicationSlotInvalidationCause.
2914 */
2916GetSlotInvalidationCause(const char *cause_name)
2917{
2918 Assert(cause_name);
2919
2920 /* Search lookup table for the cause having this name */
2921 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2922 {
2923 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2925 }
2926
2927 Assert(false);
2928 return RS_INVAL_NONE; /* to keep compiler quiet */
2929}
2930
2931/*
2932 * Maps a ReplicationSlotInvalidationCause to the invalidation
2933 * reason for a replication slot.
2934 */
2935const char *
2937{
2938 /* Search lookup table for the name of this cause */
2939 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2940 {
2941 if (SlotInvalidationCauses[i].cause == cause)
2943 }
2944
2945 Assert(false);
2946 return "none"; /* to keep compiler quiet */
2947}
2948
2949/*
2950 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2951 *
2952 * The rawname will be parsed, and the result will be saved into *elemlist.
2953 */
2954static bool
2956{
2957 /* Verify syntax and parse string into a list of identifiers */
2959 {
2960 GUC_check_errdetail("List syntax is invalid.");
2961 return false;
2962 }
2963
2964 /* Iterate the list to validate each slot name */
2965 foreach_ptr(char, name, *elemlist)
2966 {
2967 int err_code;
2968 char *err_msg = NULL;
2969 char *err_hint = NULL;
2970
2972 &err_msg, &err_hint))
2973 {
2975 GUC_check_errdetail("%s", err_msg);
2976 if (err_hint != NULL)
2978 return false;
2979 }
2980 }
2981
2982 return true;
2983}
2984
2985/*
2986 * GUC check_hook for synchronized_standby_slots
2987 */
2988bool
2990{
2991 char *rawname;
2992 char *ptr;
2993 List *elemlist;
2994 int size;
2995 bool ok;
2997
2998 if ((*newval)[0] == '\0')
2999 return true;
3000
3001 /* Need a modifiable copy of the GUC string */
3003
3004 /* Now verify if the specified slots exist and have correct type */
3006
3007 if (!ok || elemlist == NIL)
3008 {
3009 pfree(rawname);
3011 return ok;
3012 }
3013
3014 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3015 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3016 foreach_ptr(char, slot_name, elemlist)
3017 size += strlen(slot_name) + 1;
3018
3019 /* GUC extra value must be guc_malloc'd, not palloc'd */
3020 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3021 if (!config)
3022 return false;
3023
3024 /* Transform the data into SyncStandbySlotsConfigData */
3025 config->nslotnames = list_length(elemlist);
3026
3027 ptr = config->slot_names;
3028 foreach_ptr(char, slot_name, elemlist)
3029 {
3030 strcpy(ptr, slot_name);
3031 ptr += strlen(slot_name) + 1;
3032 }
3033
3034 *extra = config;
3035
3036 pfree(rawname);
3038 return true;
3039}
3040
3041/*
3042 * GUC assign_hook for synchronized_standby_slots
3043 */
3044void
3046{
3047 /*
3048 * The standby slots may have changed, so we must recompute the oldest
3049 * LSN.
3050 */
3052
3054}
3055
3056/*
3057 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3058 */
3059bool
3060SlotExistsInSyncStandbySlots(const char *slot_name)
3061{
3062 const char *standby_slot_name;
3063
3064 /* Return false if there is no value in synchronized_standby_slots */
3066 return false;
3067
3068 /*
3069 * XXX: We are not expecting this list to be long so a linear search
3070 * shouldn't hurt but if that turns out not to be true then we can cache
3071 * this information for each WalSender as well.
3072 */
3074 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3075 {
3076 if (strcmp(standby_slot_name, slot_name) == 0)
3077 return true;
3078
3080 }
3081
3082 return false;
3083}
3084
3085/*
3086 * Return true if the slots specified in synchronized_standby_slots have caught up to
3087 * the given WAL location, false otherwise.
3088 *
3089 * The elevel parameter specifies the error level used for logging messages
3090 * related to slots that do not exist, are invalidated, or are inactive.
3091 */
3092bool
3094{
3095 const char *name;
3096 int caught_up_slot_num = 0;
3098
3099 /*
3100 * Don't need to wait for the standbys to catch up if there is no value in
3101 * synchronized_standby_slots.
3102 */
3104 return true;
3105
3106 /*
3107 * Don't need to wait for the standbys to catch up if we are on a standby
3108 * server, since we do not support syncing slots to cascading standbys.
3109 */
3110 if (RecoveryInProgress())
3111 return true;
3112
3113 /*
3114 * Don't need to wait for the standbys to catch up if they are already
3115 * beyond the specified WAL location.
3116 */
3119 return true;
3120
3121 /*
3122 * To prevent concurrent slot dropping and creation while filtering the
3123 * slots, take the ReplicationSlotControlLock outside of the loop.
3124 */
3126
3128 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3129 {
3130 XLogRecPtr restart_lsn;
3131 bool invalidated;
3132 bool inactive;
3133 ReplicationSlot *slot;
3134
3135 slot = SearchNamedReplicationSlot(name, false);
3136
3137 /*
3138 * If a slot name provided in synchronized_standby_slots does not
3139 * exist, report a message and exit the loop.
3140 */
3141 if (!slot)
3142 {
3143 ereport(elevel,
3145 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3146 name, "synchronized_standby_slots"),
3147 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3148 name),
3149 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3150 name, "synchronized_standby_slots"));
3151 break;
3152 }
3153
3154 /* Same as above: if a slot is not physical, exit the loop. */
3155 if (SlotIsLogical(slot))
3156 {
3157 ereport(elevel,
3159 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3160 name, "synchronized_standby_slots"),
3161 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3162 name),
3163 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3164 name, "synchronized_standby_slots"));
3165 break;
3166 }
3167
3168 SpinLockAcquire(&slot->mutex);
3169 restart_lsn = slot->data.restart_lsn;
3170 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3172 SpinLockRelease(&slot->mutex);
3173
3174 if (invalidated)
3175 {
3176 /* Specified physical slot has been invalidated */
3177 ereport(elevel,
3179 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3180 name, "synchronized_standby_slots"),
3181 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3182 name),
3183 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3184 name, "synchronized_standby_slots"));
3185 break;
3186 }
3187
3188 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3189 {
3190 /* Log a message if no active_pid for this physical slot */
3191 if (inactive)
3192 ereport(elevel,
3194 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3195 name, "synchronized_standby_slots"),
3196 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3197 name),
3198 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3199 name, "synchronized_standby_slots"));
3200
3201 /* Continue if the current slot hasn't caught up. */
3202 break;
3203 }
3204
3205 Assert(restart_lsn >= wait_for_lsn);
3206
3208 min_restart_lsn > restart_lsn)
3209 min_restart_lsn = restart_lsn;
3210
3212
3213 name += strlen(name) + 1;
3214 }
3215
3217
3218 /*
3219 * Return false if not all the standbys have caught up to the specified
3220 * WAL location.
3221 */
3223 return false;
3224
3225 /* The ss_oldest_flush_lsn must not retreat. */
3228
3230
3231 return true;
3232}
3233
3234/*
3235 * Wait for physical standbys to confirm receiving the given lsn.
3236 *
3237 * Used by logical decoding SQL functions. It waits for physical standbys
3238 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3239 */
3240void
3242{
3243 /*
3244 * Don't need to wait for the standby to catch up if the current acquired
3245 * slot is not a logical failover slot, or there is no value in
3246 * synchronized_standby_slots.
3247 */
3249 return;
3250
3252
3253 for (;;)
3254 {
3256
3258 {
3259 ConfigReloadPending = false;
3261 }
3262
3263 /* Exit if done waiting for every slot. */
3265 break;
3266
3267 /*
3268 * Wait for the slots in the synchronized_standby_slots to catch up,
3269 * but use a timeout (1s) so we can also check if the
3270 * synchronized_standby_slots has been changed.
3271 */
3274 }
3275
3277}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1719
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1793
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1643
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1607
#define NameStr(name)
Definition c.h:798
#define ngettext(s, p, n)
Definition c.h:1233
#define Assert(condition)
Definition c.h:906
#define PG_BINARY
Definition c.h:1337
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:513
uint64_t uint64
Definition c.h:580
#define pg_unreachable()
Definition c.h:353
uint32_t uint32
Definition c.h:579
#define lengthof(array)
Definition c.h:836
#define MemSet(start, val, len)
Definition c.h:1070
#define StaticAssertDecl(condition, errmessage)
Definition c.h:971
uint32 TransactionId
Definition c.h:699
size_t Size
Definition c.h:652
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1322
int errcode_for_file_access(void)
Definition elog.c:897
int errcode(int sqlerrcode)
Definition elog.c:874
#define _(x)
Definition elog.c:95
#define LOG
Definition elog.h:31
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:41
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int int errhint_internal(const char *fmt,...) pg_attribute_printf(1
int MakePGDirectory(const char *directoryName)
Definition fd.c:3963
int FreeDir(DIR *dir)
Definition fd.c:3009
int CloseTransientFile(int fd)
Definition fd.c:2855
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:757
DIR * AllocateDir(const char *dirname)
Definition fd.c:2891
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2957
int pg_fsync(int fd)
Definition fd.c:390
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:121
ProcNumber MyProcNumber
Definition globals.c:90
bool IsUnderPostmaster
Definition globals.c:120
Oid MyDatabaseId
Definition globals.c:94
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6628
void * guc_malloc(int elevel, size_t size)
Definition guc.c:636
#define newval
#define GUC_check_errdetail
Definition guc.h:506
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:510
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1587
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:434
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1912
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1956
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
@ B_STARTUP
Definition miscadmin.h:365
#define END_CRIT_SECTION()
Definition miscadmin.h:152
Oid GetUserId(void)
Definition miscinit.c:470
BackendType MyBackendType
Definition miscinit.c:65
bool has_rolreplication(Oid roleid)
Definition miscinit.c:689
void namestrcpy(Name name, const char *str)
Definition name.c:233
static char * errmsg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define GetPGProcByNumber(n)
Definition proc.h:504
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3955
bool SignalRecoveryConflict(PGPROC *proc, pid_t pid, RecoveryConflictReason reason)
Definition procarray.c:3459
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:575
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:622
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1966
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:114
char * synchronized_standby_slots
Definition slot.c:165
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3045
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:136
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2310
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:380
int idle_replication_slot_timeout_secs
Definition slot.c:159
void ReplicationSlotDropAcquired(void)
Definition slot.c:1035
void ReplicationSlotMarkDirty(void)
Definition slot.c:1177
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1874
void ReplicationSlotReserveWal(void)
Definition slot.c:1697
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1450
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:174
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:313
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1511
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:133
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1371
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2916
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1219
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2673
void ReplicationSlotPersist(void)
Definition slot.c:1194
bool CheckLogicalSlotExists(void)
Definition slot.c:1616
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1776
ReplicationSlot * MyReplicationSlot
Definition slot.c:149
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2510
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:913
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3060
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2955
void ReplicationSlotSave(void)
Definition slot.c:1159
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:542
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2449
#define ReplicationSlotOnDiskV2Size
Definition slot.c:139
void CheckSlotPermissions(void)
Definition slot.c:1680
bool ReplicationSlotName(int index, Name name)
Definition slot.c:591
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:2989
void ReplicationSlotsShmemInit(void)
Definition slot.c:207
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:268
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:953
void ReplicationSlotRelease(void)
Definition slot.c:762
int max_replication_slots
Definition slot.c:152
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:146
#define SLOT_VERSION
Definition slot.c:143
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3241
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3093
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1301
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:861
void ReplicationSlotInitialize(void)
Definition slot.c:243
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1052
void StartupReplicationSlots(void)
Definition slot.c:2388
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1858
void CheckSlotRequirements(void)
Definition slot.c:1658
#define SLOT_MAGIC
Definition slot.c:142
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2206
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:168
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:130
Size ReplicationSlotsShmemSize(void)
Definition slot.c:189
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2936
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:252
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:363
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:287
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:306
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1825
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
PGPROC * MyProc
Definition proc.c:68
PROC_HDR * ProcGlobal
Definition proc.c:71
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1283
@ RECOVERY_CONFLICT_LOGICALSLOT
Definition standby.h:43
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:202
int pgxactoff
Definition proc.h:199
uint8 * statusFlags
Definition proc.h:456
ReplicationSlot replication_slots[1]
Definition slot.h:299
ReplicationSlotPersistentData slotdata
Definition slot.c:84
pg_crc32c checksum
Definition slot.c:73
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:229
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:230
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:238
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:284
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
bool just_dirtied
Definition slot.h:195
XLogRecPtr last_saved_restart_lsn
Definition slot.h:271
XLogRecPtr candidate_restart_lsn
Definition slot.h:231
LWLock io_in_progress_lock
Definition slot.h:216
ConditionVariable active_cv
Definition slot.h:219
TransactionId candidate_catalog_xmin
Definition slot.h:228
ProcNumber active_proc
Definition slot.h:192
ReplicationSlotPersistentData data
Definition slot.h:213
TimestampTz inactive_since
Definition slot.h:245
const char * cause_name
Definition slot.c:111
ReplicationSlotInvalidationCause cause
Definition slot.c:110
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:102
ConditionVariable wal_confirm_rcv_cv
Definition type.h:96
Definition c.h:793
unsigned short st_mode
Definition win32_port.h:258
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2775
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
const char * name
bool am_walsender
Definition walsender.c:124
bool log_replication_commands
Definition walsender.c:134
WalSndCtlData * WalSndCtl
Definition walsender.c:118
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6444
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3779
bool EnableHotStandby
Definition xlog.c:125
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6547
int wal_level
Definition xlog.c:135
int wal_segment_size
Definition xlog.c:147
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2653
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9586
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767
@ WAL_LEVEL_REPLICA
Definition xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)