PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "utils/builtins.h"
59#include "utils/guc_hooks.h"
61#include "utils/varlena.h"
62
63/*
64 * Replication slot on-disk data structure.
65 */
67{
68 /* first part of this struct needs to be version independent */
69
70 /* data not covered by checksum */
73
74 /* data covered by checksum */
77
78 /*
79 * The actual data in the slot that follows can differ based on the above
80 * 'version'.
81 */
82
85
86/*
87 * Struct for the configuration of synchronized_standby_slots.
88 *
89 * Note: this must be a flat representation that can be held in a single chunk
90 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 * synchronized_standby_slots GUC.
92 */
93typedef struct
94{
95 /* Number of slot names in the slot_names[] */
97
98 /*
99 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 */
101 char slot_names[FLEXIBLE_ARRAY_MEMBER];
103
104/*
105 * Lookup table for slot invalidation causes.
106 */
112
114 {RS_INVAL_NONE, "none"},
115 {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 {RS_INVAL_HORIZON, "rows_removed"},
117 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119};
120
121/*
122 * Ensure that the lookup table is up-to-date with the enums defined in
123 * ReplicationSlotInvalidationCause.
124 */
126 "array length mismatch");
127
128/* size of version independent data */
129#define ReplicationSlotOnDiskConstantSize \
130 offsetof(ReplicationSlotOnDisk, slotdata)
131/* size of the part of the slot not covered by the checksum */
132#define ReplicationSlotOnDiskNotChecksummedSize \
133 offsetof(ReplicationSlotOnDisk, version)
134/* size of the part covered by the checksum */
135#define ReplicationSlotOnDiskChecksummedSize \
136 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137/* size of the slot data that is version dependent */
138#define ReplicationSlotOnDiskV2Size \
139 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140
141#define SLOT_MAGIC 0x1051CA1 /* format identifier */
142#define SLOT_VERSION 5 /* version for new files */
143
144/* Control array for replication slot management */
146
147/* My backend's replication slot in the shared memory array */
149
150/* GUC variables */
151int max_replication_slots = 10; /* the maximum number of replication
152 * slots */
153
154/*
155 * Invalidate replication slots that have remained idle longer than this
156 * duration; '0' disables it.
157 */
159
160/*
161 * This GUC lists streaming replication standby server slot names that
162 * logical WAL sender processes will wait for.
163 */
165
166/* This is the parsed and cached configuration for synchronized_standby_slots */
168
169/*
170 * Oldest LSN that has been confirmed to be flushed to the standbys
171 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 */
174
175static void ReplicationSlotShmemExit(int code, Datum arg);
176static bool IsSlotForConflictCheck(const char *name);
177static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178
179/* internal persistency functions */
180static void RestoreSlotFromDisk(const char *name);
181static void CreateSlotOnDisk(ReplicationSlot *slot);
182static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183
184/*
185 * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 */
187Size
189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
201
202/*
203 * Allocate and initialize shared memory for replication slots.
204 */
205void
207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
233 }
234 }
235}
236
237/*
238 * Register the callback for replication slot cleanup and releasing.
239 */
240void
245
246/*
247 * Release and cleanup replication slots.
248 */
249static void
251{
252 /* Make sure active replication slots are released */
253 if (MyReplicationSlot != NULL)
255
256 /* Also cleanup all the temporary slots. */
258}
259
260/*
261 * Check whether the passed slot name is valid and report errors at elevel.
262 *
263 * See comments for ReplicationSlotValidateNameInternal().
264 */
265bool
267 int elevel)
268{
269 int err_code;
270 char *err_msg = NULL;
271 char *err_hint = NULL;
272
274 &err_code, &err_msg, &err_hint))
275 {
276 /*
277 * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 * and errhint(), since the messages from
279 * ReplicationSlotValidateNameInternal() are already translated. This
280 * avoids double translation.
281 */
282 ereport(elevel,
284 errmsg_internal("%s", err_msg),
285 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286
287 pfree(err_msg);
288 if (err_hint != NULL)
290 return false;
291 }
292
293 return true;
294}
295
296/*
297 * Check whether the passed slot name is valid.
298 *
299 * An error will be reported for a reserved replication slot name if
300 * allow_reserved_name is set to false.
301 *
302 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 * the name to be used as a directory name on every supported OS.
304 *
305 * Returns true if the slot name is valid. Otherwise, returns false and stores
306 * the error code, error message, and optional hint in err_code, err_msg, and
307 * err_hint, respectively. The caller is responsible for freeing err_msg and
308 * err_hint, which are palloc'd.
309 */
310bool
312 int *err_code, char **err_msg, char **err_hint)
313{
314 const char *cp;
315
316 if (strlen(name) == 0)
317 {
319 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 *err_hint = NULL;
321 return false;
322 }
323
324 if (strlen(name) >= NAMEDATALEN)
325 {
327 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 *err_hint = NULL;
329 return false;
330 }
331
332 for (cp = name; *cp; cp++)
333 {
334 if (!((*cp >= 'a' && *cp <= 'z')
335 || (*cp >= '0' && *cp <= '9')
336 || (*cp == '_')))
337 {
339 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 return false;
342 }
343 }
344
346 {
348 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 return false;
352 }
353
354 return true;
355}
356
357/*
358 * Return true if the replication slot name is "pg_conflict_detection".
359 */
360static bool
362{
363 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364}
365
366/*
367 * Create a new replication slot and mark it as used by this backend.
368 *
369 * name: Name of the slot
370 * db_specific: logical decoding is db specific; if the slot is going to
371 * be used for that pass true, otherwise false.
372 * two_phase: If enabled, allows decoding of prepared transactions.
373 * failover: If enabled, allows the slot to be synced to standbys so
374 * that logical replication can be resumed after failover.
375 * synced: True if the slot is synchronized from the primary server.
376 */
377void
379 ReplicationSlotPersistency persistency,
380 bool two_phase, bool failover, bool synced)
381{
382 ReplicationSlot *slot = NULL;
383 int i;
384
386
387 /*
388 * The logical launcher or pg_upgrade may create or migrate an internal
389 * slot, so using a reserved name is allowed in these cases.
390 */
392 ERROR);
393
394 if (failover)
395 {
396 /*
397 * Do not allow users to create the failover enabled slots on the
398 * standby as we do not support sync to the cascading standby.
399 *
400 * However, failover enabled slots can be created during slot
401 * synchronization because we need to retain the same values as the
402 * remote slot.
403 */
407 errmsg("cannot enable failover for a replication slot created on the standby"));
408
409 /*
410 * Do not allow users to create failover enabled temporary slots,
411 * because temporary slots will not be synced to the standby.
412 *
413 * However, failover enabled temporary slots can be created during
414 * slot synchronization. See the comments atop slotsync.c for details.
415 */
416 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
419 errmsg("cannot enable failover for a temporary replication slot"));
420 }
421
422 /*
423 * If some other backend ran this code concurrently with us, we'd likely
424 * both allocate the same slot, and that would be bad. We'd also be at
425 * risk of missing a name collision. Also, we don't want to try to create
426 * a new slot while somebody's busy cleaning up an old one, because we
427 * might both be monkeying with the same directory.
428 */
430
431 /*
432 * Check for name collision, and identify an allocatable slot. We need to
433 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
434 * else can change the in_use flags while we're looking at them.
435 */
437 for (i = 0; i < max_replication_slots; i++)
438 {
440
441 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
444 errmsg("replication slot \"%s\" already exists", name)));
445 if (!s->in_use && slot == NULL)
446 slot = s;
447 }
449
450 /* If all slots are in use, we're out of luck. */
451 if (slot == NULL)
454 errmsg("all replication slots are in use"),
455 errhint("Free one or increase \"max_replication_slots\".")));
456
457 /*
458 * Since this slot is not in use, nobody should be looking at any part of
459 * it other than the in_use field unless they're trying to allocate it.
460 * And since we hold ReplicationSlotAllocationLock, nobody except us can
461 * be doing that. So it's safe to initialize the slot.
462 */
463 Assert(!slot->in_use);
464 Assert(slot->active_pid == 0);
465
466 /* first initialize persistent data */
467 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
468 namestrcpy(&slot->data.name, name);
470 slot->data.persistency = persistency;
471 slot->data.two_phase = two_phase;
473 slot->data.failover = failover;
474 slot->data.synced = synced;
475
476 /* and then data only present in shared memory */
477 slot->just_dirtied = false;
478 slot->dirty = false;
487 slot->inactive_since = 0;
489
490 /*
491 * Create the slot on disk. We haven't actually marked the slot allocated
492 * yet, so no special cleanup is required if this errors out.
493 */
494 CreateSlotOnDisk(slot);
495
496 /*
497 * We need to briefly prevent any other backend from iterating over the
498 * slots while we flip the in_use flag. We also need to set the active
499 * flag while holding the ControlLock as otherwise a concurrent
500 * ReplicationSlotAcquire() could acquire the slot as well.
501 */
503
504 slot->in_use = true;
505
506 /* We can now mark the slot active, and that makes it our slot. */
507 SpinLockAcquire(&slot->mutex);
508 Assert(slot->active_pid == 0);
509 slot->active_pid = MyProcPid;
510 SpinLockRelease(&slot->mutex);
511 MyReplicationSlot = slot;
512
514
515 /*
516 * Create statistics entry for the new logical slot. We don't collect any
517 * stats for physical slots, so no need to create an entry for the same.
518 * See ReplicationSlotDropPtr for why we need to do this before releasing
519 * ReplicationSlotAllocationLock.
520 */
521 if (SlotIsLogical(slot))
523
524 /*
525 * Now that the slot has been marked as in_use and active, it's safe to
526 * let somebody else try to allocate a slot.
527 */
529
530 /* Let everybody know we've modified this slot */
532}
533
534/*
535 * Search for the named replication slot.
536 *
537 * Return the replication slot if found, otherwise NULL.
538 */
541{
542 int i;
543 ReplicationSlot *slot = NULL;
544
545 if (need_lock)
547
548 for (i = 0; i < max_replication_slots; i++)
549 {
551
552 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
553 {
554 slot = s;
555 break;
556 }
557 }
558
559 if (need_lock)
561
562 return slot;
563}
564
565/*
566 * Return the index of the replication slot in
567 * ReplicationSlotCtl->replication_slots.
568 *
569 * This is mainly useful to have an efficient key for storing replication slot
570 * stats.
571 */
572int
580
581/*
582 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
583 * the slot's name and true is returned.
584 *
585 * This likely is only useful for pgstat_replslot.c during shutdown, in other
586 * cases there are obvious TOCTOU issues.
587 */
588bool
590{
591 ReplicationSlot *slot;
592 bool found;
593
595
596 /*
597 * Ensure that the slot cannot be dropped while we copy the name. Don't
598 * need the spinlock as the name of an existing slot cannot change.
599 */
601 found = slot->in_use;
602 if (slot->in_use)
605
606 return found;
607}
608
609/*
610 * Find a previously created slot and mark it as used by this process.
611 *
612 * An error is raised if nowait is true and the slot is currently in use. If
613 * nowait is false, we sleep until the slot is released by the owning process.
614 *
615 * An error is raised if error_if_invalid is true and the slot is found to
616 * be invalid. It should always be set to true, except when we are temporarily
617 * acquiring the slot and don't intend to change it.
618 */
619void
620ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
621{
623 int active_pid;
624
625 Assert(name != NULL);
626
627retry:
629
631
632 /* Check if the slot exists with the given name. */
634 if (s == NULL || !s->in_use)
635 {
637
640 errmsg("replication slot \"%s\" does not exist",
641 name)));
642 }
643
644 /*
645 * Do not allow users to acquire the reserved slot. This scenario may
646 * occur if the launcher that owns the slot has terminated unexpectedly
647 * due to an error, and a backend process attempts to reuse the slot.
648 */
652 errmsg("cannot acquire replication slot \"%s\"", name),
653 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
654
655 /*
656 * This is the slot we want; check if it's active under some other
657 * process. In single user mode, we don't need this check.
658 */
660 {
661 /*
662 * Get ready to sleep on the slot in case it is active. (We may end
663 * up not sleeping, but we don't want to do this while holding the
664 * spinlock.)
665 */
666 if (!nowait)
668
669 /*
670 * It is important to reset the inactive_since under spinlock here to
671 * avoid race conditions with slot invalidation. See comments related
672 * to inactive_since in InvalidatePossiblyObsoleteSlot.
673 */
675 if (s->active_pid == 0)
677 active_pid = s->active_pid;
680 }
681 else
682 {
683 s->active_pid = active_pid = MyProcPid;
685 }
687
688 /*
689 * If we found the slot but it's already active in another process, we
690 * wait until the owning process signals us that it's been released, or
691 * error out.
692 */
693 if (active_pid != MyProcPid)
694 {
695 if (!nowait)
696 {
697 /* Wait here until we get signaled, and then restart */
701 goto retry;
702 }
703
706 errmsg("replication slot \"%s\" is active for PID %d",
707 NameStr(s->data.name), active_pid)));
708 }
709 else if (!nowait)
710 ConditionVariableCancelSleep(); /* no sleep needed after all */
711
712 /* We made this slot active, so it's ours now. */
714
715 /*
716 * We need to check for invalidation after making the slot ours to avoid
717 * the possible race condition with the checkpointer that can otherwise
718 * invalidate the slot immediately after the check.
719 */
723 errmsg("can no longer access replication slot \"%s\"",
724 NameStr(s->data.name)),
725 errdetail("This replication slot has been invalidated due to \"%s\".",
727
728 /* Let everybody know we've modified this slot */
730
731 /*
732 * The call to pgstat_acquire_replslot() protects against stats for a
733 * different slot, from before a restart or such, being present during
734 * pgstat_report_replslot().
735 */
736 if (SlotIsLogical(s))
738
739
740 if (am_walsender)
741 {
744 ? errmsg("acquired logical replication slot \"%s\"",
745 NameStr(s->data.name))
746 : errmsg("acquired physical replication slot \"%s\"",
747 NameStr(s->data.name)));
748 }
749}
750
751/*
752 * Release the replication slot that this backend considers to own.
753 *
754 * This or another backend can re-acquire the slot later.
755 * Resources this slot requires will be preserved.
756 */
757void
759{
761 char *slotname = NULL; /* keep compiler quiet */
762 bool is_logical;
763 TimestampTz now = 0;
764
765 Assert(slot != NULL && slot->active_pid != 0);
766
768
769 if (am_walsender)
770 slotname = pstrdup(NameStr(slot->data.name));
771
772 if (slot->data.persistency == RS_EPHEMERAL)
773 {
774 /*
775 * Delete the slot. There is no !PANIC case where this is allowed to
776 * fail, all that may happen is an incomplete cleanup of the on-disk
777 * data.
778 */
780
781 /*
782 * Request to disable logical decoding, even though this slot may not
783 * have been the last logical slot. The checkpointer will verify if
784 * logical decoding should actually be disabled.
785 */
786 if (is_logical)
788 }
789
790 /*
791 * If slot needed to temporarily restrain both data and catalog xmin to
792 * create the catalog snapshot, remove that temporary constraint.
793 * Snapshots can only be exported while the initial snapshot is still
794 * acquired.
795 */
796 if (!TransactionIdIsValid(slot->data.xmin) &&
798 {
799 SpinLockAcquire(&slot->mutex);
801 SpinLockRelease(&slot->mutex);
803 }
804
805 /*
806 * Set the time since the slot has become inactive. We get the current
807 * time beforehand to avoid system call while holding the spinlock.
808 */
810
811 if (slot->data.persistency == RS_PERSISTENT)
812 {
813 /*
814 * Mark persistent slot inactive. We're not freeing it, just
815 * disconnecting, but wake up others that may be waiting for it.
816 */
817 SpinLockAcquire(&slot->mutex);
818 slot->active_pid = 0;
820 SpinLockRelease(&slot->mutex);
822 }
823 else
825
827
828 /* might not have been set when we've been a plain slot */
833
834 if (am_walsender)
835 {
838 ? errmsg("released logical replication slot \"%s\"",
839 slotname)
840 : errmsg("released physical replication slot \"%s\"",
841 slotname));
842
843 pfree(slotname);
844 }
845}
846
847/*
848 * Cleanup temporary slots created in current session.
849 *
850 * Cleanup only synced temporary slots if 'synced_only' is true, else
851 * cleanup all temporary slots.
852 *
853 * If it drops the last logical slot in the cluster, requests to disable
854 * logical decoding.
855 */
856void
858{
859 int i;
861 bool dropped_logical = false;
862
864
865restart:
868 for (i = 0; i < max_replication_slots; i++)
869 {
871
872 if (!s->in_use)
873 continue;
874
876
879
880 if ((s->active_pid == MyProcPid &&
881 (!synced_only || s->data.synced)))
882 {
885 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
886
887 if (SlotIsLogical(s))
888 dropped_logical = true;
889
891
893 goto restart;
894 }
895 else
897 }
898
900
903}
904
905/*
906 * Permanently drop replication slot identified by the passed in name.
907 */
908void
909ReplicationSlotDrop(const char *name, bool nowait)
910{
911 bool is_logical;
912
914
915 ReplicationSlotAcquire(name, nowait, false);
916
917 /*
918 * Do not allow users to drop the slots which are currently being synced
919 * from the primary to the standby.
920 */
924 errmsg("cannot drop replication slot \"%s\"", name),
925 errdetail("This replication slot is being synchronized from the primary server."));
926
928
930
931 if (is_logical)
933}
934
935/*
936 * Change the definition of the slot identified by the specified name.
937 *
938 * Altering the two_phase property of a slot requires caution on the
939 * client-side. Enabling it at any random point during decoding has the
940 * risk that transactions prepared before this change may be skipped by
941 * the decoder, leading to missing prepare records on the client. So, we
942 * enable it for subscription related slots only once the initial tablesync
943 * is finished. See comments atop worker.c. Disabling it is safe only when
944 * there are no pending prepared transaction, otherwise, the changes of
945 * already prepared transactions can be replicated again along with their
946 * corresponding commit leading to duplicate data or errors.
947 */
948void
949ReplicationSlotAlter(const char *name, const bool *failover,
950 const bool *two_phase)
951{
952 bool update_slot = false;
953
956
957 ReplicationSlotAcquire(name, false, true);
958
962 errmsg("cannot use %s with a physical replication slot",
963 "ALTER_REPLICATION_SLOT"));
964
965 if (RecoveryInProgress())
966 {
967 /*
968 * Do not allow users to alter the slots which are currently being
969 * synced from the primary to the standby.
970 */
974 errmsg("cannot alter replication slot \"%s\"", name),
975 errdetail("This replication slot is being synchronized from the primary server."));
976
977 /*
978 * Do not allow users to enable failover on the standby as we do not
979 * support sync to the cascading standby.
980 */
981 if (failover && *failover)
984 errmsg("cannot enable failover for a replication slot"
985 " on the standby"));
986 }
987
988 if (failover)
989 {
990 /*
991 * Do not allow users to enable failover for temporary slots as we do
992 * not support syncing temporary slots to the standby.
993 */
997 errmsg("cannot enable failover for a temporary replication slot"));
998
1000 {
1004
1005 update_slot = true;
1006 }
1007 }
1008
1010 {
1014
1015 update_slot = true;
1016 }
1017
1018 if (update_slot)
1019 {
1022 }
1023
1025}
1026
1027/*
1028 * Permanently drop the currently acquired replication slot.
1029 */
1030void
1032{
1034
1036
1037 /* slot isn't acquired anymore */
1039
1041}
1042
1043/*
1044 * Permanently drop the replication slot which will be released by the point
1045 * this function returns.
1046 */
1047static void
1049{
1050 char path[MAXPGPATH];
1051 char tmppath[MAXPGPATH];
1052
1053 /*
1054 * If some other backend ran this code concurrently with us, we might try
1055 * to delete a slot with a certain name while someone else was trying to
1056 * create a slot with the same name.
1057 */
1059
1060 /* Generate pathnames. */
1061 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1062 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1063
1064 /*
1065 * Rename the slot directory on disk, so that we'll no longer recognize
1066 * this as a valid slot. Note that if this fails, we've got to mark the
1067 * slot inactive before bailing out. If we're dropping an ephemeral or a
1068 * temporary slot, we better never fail hard as the caller won't expect
1069 * the slot to survive and this might get called during error handling.
1070 */
1071 if (rename(path, tmppath) == 0)
1072 {
1073 /*
1074 * We need to fsync() the directory we just renamed and its parent to
1075 * make sure that our changes are on disk in a crash-safe fashion. If
1076 * fsync() fails, we can't be sure whether the changes are on disk or
1077 * not. For now, we handle that by panicking;
1078 * StartupReplicationSlots() will try to straighten it out after
1079 * restart.
1080 */
1082 fsync_fname(tmppath, true);
1085 }
1086 else
1087 {
1088 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1089
1090 SpinLockAcquire(&slot->mutex);
1091 slot->active_pid = 0;
1092 SpinLockRelease(&slot->mutex);
1093
1094 /* wake up anyone waiting on this slot */
1096
1099 errmsg("could not rename file \"%s\" to \"%s\": %m",
1100 path, tmppath)));
1101 }
1102
1103 /*
1104 * The slot is definitely gone. Lock out concurrent scans of the array
1105 * long enough to kill it. It's OK to clear the active PID here without
1106 * grabbing the mutex because nobody else can be scanning the array here,
1107 * and nobody can be attached to this slot and thus access it without
1108 * scanning the array.
1109 *
1110 * Also wake up processes waiting for it.
1111 */
1113 slot->active_pid = 0;
1114 slot->in_use = false;
1117
1118 /*
1119 * Slot is dead and doesn't prevent resource removal anymore, recompute
1120 * limits.
1121 */
1124
1125 /*
1126 * If removing the directory fails, the worst thing that will happen is
1127 * that the user won't be able to create a new slot with the same name
1128 * until the next server restart. We warn about it, but that's all.
1129 */
1130 if (!rmtree(tmppath, true))
1132 (errmsg("could not remove directory \"%s\"", tmppath)));
1133
1134 /*
1135 * Drop the statistics entry for the replication slot. Do this while
1136 * holding ReplicationSlotAllocationLock so that we don't drop a
1137 * statistics entry for another slot with the same name just created in
1138 * another session.
1139 */
1140 if (SlotIsLogical(slot))
1142
1143 /*
1144 * We release this at the very end, so that nobody starts trying to create
1145 * a slot while we're still cleaning up the detritus of the old one.
1146 */
1148}
1149
1150/*
1151 * Serialize the currently acquired slot's state from memory to disk, thereby
1152 * guaranteeing the current state will survive a crash.
1153 */
1154void
1156{
1157 char path[MAXPGPATH];
1158
1160
1163}
1164
1165/*
1166 * Signal that it would be useful if the currently acquired slot would be
1167 * flushed out to disk.
1168 *
1169 * Note that the actual flush to disk can be delayed for a long time, if
1170 * required for correctness explicitly do a ReplicationSlotSave().
1171 */
1172void
1174{
1176
1178
1179 SpinLockAcquire(&slot->mutex);
1181 MyReplicationSlot->dirty = true;
1182 SpinLockRelease(&slot->mutex);
1183}
1184
1185/*
1186 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1187 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1188 */
1189void
1191{
1193
1194 Assert(slot != NULL);
1196
1197 SpinLockAcquire(&slot->mutex);
1199 SpinLockRelease(&slot->mutex);
1200
1203}
1204
1205/*
1206 * Compute the oldest xmin across all slots and store it in the ProcArray.
1207 *
1208 * If already_locked is true, both the ReplicationSlotControlLock and the
1209 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1210 * caller first acquires the ReplicationSlotControlLock, followed by the
1211 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1212 * acquires them in that order.
1213 */
1214void
1216{
1217 int i;
1220
1225
1226 /*
1227 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1228 * values, so no backend updates the initial xmin for newly created slot
1229 * concurrently. A shared lock is used here to minimize lock contention,
1230 * especially when many slots exist and advancements occur frequently.
1231 * This is safe since an exclusive lock is taken during initial slot xmin
1232 * update in slot creation.
1233 *
1234 * One might think that we can hold the ProcArrayLock exclusively and
1235 * update the slot xmin values, but it could increase lock contention on
1236 * the ProcArrayLock, which is not great since this function can be called
1237 * at non-negligible frequency.
1238 *
1239 * Concurrent invocation of this function may cause the computed slot xmin
1240 * to regress. However, this is harmless because tuples prior to the most
1241 * recent xmin are no longer useful once advancement occurs (see
1242 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1243 * before updating the effective_xmin). Thus, such regression merely
1244 * prevents VACUUM from prematurely removing tuples without causing the
1245 * early deletion of required data.
1246 */
1247 if (!already_locked)
1249
1250 for (i = 0; i < max_replication_slots; i++)
1251 {
1253 TransactionId effective_xmin;
1254 TransactionId effective_catalog_xmin;
1255 bool invalidated;
1256
1257 if (!s->in_use)
1258 continue;
1259
1261 effective_xmin = s->effective_xmin;
1262 effective_catalog_xmin = s->effective_catalog_xmin;
1263 invalidated = s->data.invalidated != RS_INVAL_NONE;
1265
1266 /* invalidated slots need not apply */
1267 if (invalidated)
1268 continue;
1269
1270 /* check the data xmin */
1271 if (TransactionIdIsValid(effective_xmin) &&
1273 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1274 agg_xmin = effective_xmin;
1275
1276 /* check the catalog xmin */
1277 if (TransactionIdIsValid(effective_catalog_xmin) &&
1279 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1280 agg_catalog_xmin = effective_catalog_xmin;
1281 }
1282
1284
1285 if (!already_locked)
1287}
1288
1289/*
1290 * Compute the oldest restart LSN across all slots and inform xlog module.
1291 *
1292 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1293 * purpose, we don't try to account for that, because this module doesn't
1294 * know what to compare against.
1295 */
1296void
1298{
1299 int i;
1301
1303
1305 for (i = 0; i < max_replication_slots; i++)
1306 {
1308 XLogRecPtr restart_lsn;
1309 XLogRecPtr last_saved_restart_lsn;
1310 bool invalidated;
1311 ReplicationSlotPersistency persistency;
1312
1313 if (!s->in_use)
1314 continue;
1315
1317 persistency = s->data.persistency;
1318 restart_lsn = s->data.restart_lsn;
1319 invalidated = s->data.invalidated != RS_INVAL_NONE;
1320 last_saved_restart_lsn = s->last_saved_restart_lsn;
1322
1323 /* invalidated slots need not apply */
1324 if (invalidated)
1325 continue;
1326
1327 /*
1328 * For persistent slot use last_saved_restart_lsn to compute the
1329 * oldest LSN for removal of WAL segments. The segments between
1330 * last_saved_restart_lsn and restart_lsn might be needed by a
1331 * persistent slot in the case of database crash. Non-persistent
1332 * slots can't survive the database crash, so we don't care about
1333 * last_saved_restart_lsn for them.
1334 */
1335 if (persistency == RS_PERSISTENT)
1336 {
1337 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1338 restart_lsn > last_saved_restart_lsn)
1339 {
1340 restart_lsn = last_saved_restart_lsn;
1341 }
1342 }
1343
1344 if (XLogRecPtrIsValid(restart_lsn) &&
1346 restart_lsn < min_required))
1347 min_required = restart_lsn;
1348 }
1350
1352}
1353
1354/*
1355 * Compute the oldest WAL LSN required by *logical* decoding slots..
1356 *
1357 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1358 * slots exist.
1359 *
1360 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1361 * ignores physical replication slots.
1362 *
1363 * The results aren't required frequently, so we don't maintain a precomputed
1364 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1365 */
1368{
1370 int i;
1371
1372 if (max_replication_slots <= 0)
1373 return InvalidXLogRecPtr;
1374
1376
1377 for (i = 0; i < max_replication_slots; i++)
1378 {
1379 ReplicationSlot *s;
1380 XLogRecPtr restart_lsn;
1381 XLogRecPtr last_saved_restart_lsn;
1382 bool invalidated;
1383 ReplicationSlotPersistency persistency;
1384
1386
1387 /* cannot change while ReplicationSlotCtlLock is held */
1388 if (!s->in_use)
1389 continue;
1390
1391 /* we're only interested in logical slots */
1392 if (!SlotIsLogical(s))
1393 continue;
1394
1395 /* read once, it's ok if it increases while we're checking */
1397 persistency = s->data.persistency;
1398 restart_lsn = s->data.restart_lsn;
1399 invalidated = s->data.invalidated != RS_INVAL_NONE;
1400 last_saved_restart_lsn = s->last_saved_restart_lsn;
1402
1403 /* invalidated slots need not apply */
1404 if (invalidated)
1405 continue;
1406
1407 /*
1408 * For persistent slot use last_saved_restart_lsn to compute the
1409 * oldest LSN for removal of WAL segments. The segments between
1410 * last_saved_restart_lsn and restart_lsn might be needed by a
1411 * persistent slot in the case of database crash. Non-persistent
1412 * slots can't survive the database crash, so we don't care about
1413 * last_saved_restart_lsn for them.
1414 */
1415 if (persistency == RS_PERSISTENT)
1416 {
1417 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1418 restart_lsn > last_saved_restart_lsn)
1419 {
1420 restart_lsn = last_saved_restart_lsn;
1421 }
1422 }
1423
1424 if (!XLogRecPtrIsValid(restart_lsn))
1425 continue;
1426
1427 if (!XLogRecPtrIsValid(result) ||
1428 restart_lsn < result)
1429 result = restart_lsn;
1430 }
1431
1433
1434 return result;
1435}
1436
1437/*
1438 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1439 * passed database oid.
1440 *
1441 * Returns true if there are any slots referencing the database. *nslots will
1442 * be set to the absolute number of slots in the database, *nactive to ones
1443 * currently active.
1444 */
1445bool
1447{
1448 int i;
1449
1450 *nslots = *nactive = 0;
1451
1452 if (max_replication_slots <= 0)
1453 return false;
1454
1456 for (i = 0; i < max_replication_slots; i++)
1457 {
1458 ReplicationSlot *s;
1459
1461
1462 /* cannot change while ReplicationSlotCtlLock is held */
1463 if (!s->in_use)
1464 continue;
1465
1466 /* only logical slots are database specific, skip */
1467 if (!SlotIsLogical(s))
1468 continue;
1469
1470 /* not our database, skip */
1471 if (s->data.database != dboid)
1472 continue;
1473
1474 /* NB: intentionally counting invalidated slots */
1475
1476 /* count slots with spinlock held */
1478 (*nslots)++;
1479 if (s->active_pid != 0)
1480 (*nactive)++;
1482 }
1484
1485 if (*nslots > 0)
1486 return true;
1487 return false;
1488}
1489
1490/*
1491 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1492 * passed database oid. The caller should hold an exclusive lock on the
1493 * pg_database oid for the database to prevent creation of new slots on the db
1494 * or replay from existing slots.
1495 *
1496 * Another session that concurrently acquires an existing slot on the target DB
1497 * (most likely to drop it) may cause this function to ERROR. If that happens
1498 * it may have dropped some but not all slots.
1499 *
1500 * This routine isn't as efficient as it could be - but we don't drop
1501 * databases often, especially databases with lots of slots.
1502 *
1503 * If it drops the last logical slot in the cluster, it requests to disable
1504 * logical decoding.
1505 */
1506void
1508{
1509 int i;
1511 bool dropped = false;
1512
1513 if (max_replication_slots <= 0)
1514 return;
1515
1516restart:
1519 for (i = 0; i < max_replication_slots; i++)
1520 {
1521 ReplicationSlot *s;
1522 char *slotname;
1523 int active_pid;
1524
1526
1527 /* cannot change while ReplicationSlotCtlLock is held */
1528 if (!s->in_use)
1529 continue;
1530
1531 /* only logical slots are database specific, skip */
1532 if (!SlotIsLogical(s))
1533 continue;
1534
1535 /*
1536 * Check logical slots on other databases too so we can disable
1537 * logical decoding only if no slots in the cluster.
1538 */
1542
1543 /* not our database, skip */
1544 if (s->data.database != dboid)
1545 continue;
1546
1547 /* NB: intentionally including invalidated slots to drop */
1548
1549 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1551 /* can't change while ReplicationSlotControlLock is held */
1552 slotname = NameStr(s->data.name);
1553 active_pid = s->active_pid;
1554 if (active_pid == 0)
1555 {
1557 s->active_pid = MyProcPid;
1558 }
1560
1561 /*
1562 * Even though we hold an exclusive lock on the database object a
1563 * logical slot for that DB can still be active, e.g. if it's
1564 * concurrently being dropped by a backend connected to another DB.
1565 *
1566 * That's fairly unlikely in practice, so we'll just bail out.
1567 *
1568 * The slot sync worker holds a shared lock on the database before
1569 * operating on synced logical slots to avoid conflict with the drop
1570 * happening here. The persistent synced slots are thus safe but there
1571 * is a possibility that the slot sync worker has created a temporary
1572 * slot (which stays active even on release) and we are trying to drop
1573 * that here. In practice, the chances of hitting this scenario are
1574 * less as during slot synchronization, the temporary slot is
1575 * immediately converted to persistent and thus is safe due to the
1576 * shared lock taken on the database. So, we'll just bail out in such
1577 * a case.
1578 *
1579 * XXX: We can consider shutting down the slot sync worker before
1580 * trying to drop synced temporary slots here.
1581 */
1582 if (active_pid)
1583 ereport(ERROR,
1585 errmsg("replication slot \"%s\" is active for PID %d",
1586 slotname, active_pid)));
1587
1588 /*
1589 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1590 * holding ReplicationSlotControlLock over filesystem operations,
1591 * release ReplicationSlotControlLock and use
1592 * ReplicationSlotDropAcquired.
1593 *
1594 * As that means the set of slots could change, restart scan from the
1595 * beginning each time we release the lock.
1596 */
1599 dropped = true;
1600 goto restart;
1601 }
1603
1604 if (dropped && !found_valid_logicalslot)
1606}
1607
1608/*
1609 * Returns true if there is at least one in-use valid logical replication slot.
1610 */
1611bool
1613{
1614 bool found = false;
1615
1616 if (max_replication_slots <= 0)
1617 return false;
1618
1620 for (int i = 0; i < max_replication_slots; i++)
1621 {
1622 ReplicationSlot *s;
1623 bool invalidated;
1624
1626
1627 /* cannot change while ReplicationSlotCtlLock is held */
1628 if (!s->in_use)
1629 continue;
1630
1631 if (SlotIsPhysical(s))
1632 continue;
1633
1635 invalidated = s->data.invalidated != RS_INVAL_NONE;
1637
1638 if (invalidated)
1639 continue;
1640
1641 found = true;
1642 break;
1643 }
1645
1646 return found;
1647}
1648
1649/*
1650 * Check whether the server's configuration supports using replication
1651 * slots.
1652 */
1653void
1655{
1656 /*
1657 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1658 * needs the same check.
1659 */
1660
1661 if (max_replication_slots == 0)
1662 ereport(ERROR,
1664 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1665
1667 ereport(ERROR,
1669 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1670}
1671
1672/*
1673 * Check whether the user has privilege to use replication slots.
1674 */
1675void
1677{
1679 ereport(ERROR,
1681 errmsg("permission denied to use replication slots"),
1682 errdetail("Only roles with the %s attribute may use replication slots.",
1683 "REPLICATION")));
1684}
1685
1686/*
1687 * Reserve WAL for the currently active slot.
1688 *
1689 * Compute and set restart_lsn in a manner that's appropriate for the type of
1690 * the slot and concurrency safe.
1691 */
1692void
1694{
1696 XLogSegNo segno;
1697 XLogRecPtr restart_lsn;
1698
1699 Assert(slot != NULL);
1702
1703 /*
1704 * The replication slot mechanism is used to prevent the removal of
1705 * required WAL.
1706 *
1707 * Acquire an exclusive lock to prevent the checkpoint process from
1708 * concurrently computing the minimum slot LSN (see
1709 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1710 * replication cannot be removed during a checkpoint.
1711 *
1712 * The mechanism is reliable because if WAL reservation occurs first, the
1713 * checkpoint must wait for the restart_lsn update before determining the
1714 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1715 * first, subsequent WAL reservations will select positions at or beyond
1716 * the redo pointer of that checkpoint.
1717 */
1719
1720 /*
1721 * For logical slots log a standby snapshot and start logical decoding at
1722 * exactly that position. That allows the slot to start up more quickly.
1723 * But on a standby we cannot do WAL writes, so just use the replay
1724 * pointer; effectively, an attempt to create a logical slot on standby
1725 * will cause it to wait for an xl_running_xact record to be logged
1726 * independently on the primary, so that a snapshot can be built using the
1727 * record.
1728 *
1729 * None of this is needed (or indeed helpful) for physical slots as
1730 * they'll start replay at the last logged checkpoint anyway. Instead,
1731 * return the location of the last redo LSN, where a base backup has to
1732 * start replay at.
1733 */
1734 if (SlotIsPhysical(slot))
1735 restart_lsn = GetRedoRecPtr();
1736 else if (RecoveryInProgress())
1737 restart_lsn = GetXLogReplayRecPtr(NULL);
1738 else
1739 restart_lsn = GetXLogInsertRecPtr();
1740
1741 SpinLockAcquire(&slot->mutex);
1742 slot->data.restart_lsn = restart_lsn;
1743 SpinLockRelease(&slot->mutex);
1744
1745 /* prevent WAL removal as fast as possible */
1747
1748 /* Checkpoint shouldn't remove the required WAL. */
1750 if (XLogGetLastRemovedSegno() >= segno)
1751 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1752 NameStr(slot->data.name));
1753
1755
1756 if (!RecoveryInProgress() && SlotIsLogical(slot))
1757 {
1759
1760 /* make sure we have enough information to start */
1762
1763 /* and make sure it's fsynced to disk */
1765 }
1766}
1767
1768/*
1769 * Report that replication slot needs to be invalidated
1770 */
1771static void
1773 bool terminating,
1774 int pid,
1775 NameData slotname,
1776 XLogRecPtr restart_lsn,
1778 TransactionId snapshotConflictHorizon,
1779 long slot_idle_seconds)
1780{
1783
1786
1787 switch (cause)
1788 {
1790 {
1791 uint64 ex = oldestLSN - restart_lsn;
1792
1794 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1795 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1796 ex),
1797 LSN_FORMAT_ARGS(restart_lsn),
1798 ex);
1799 /* translator: %s is a GUC variable name */
1800 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1801 "max_slot_wal_keep_size");
1802 break;
1803 }
1804 case RS_INVAL_HORIZON:
1805 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1806 snapshotConflictHorizon);
1807 break;
1808
1809 case RS_INVAL_WAL_LEVEL:
1810 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1811 break;
1812
1814 {
1815 /* translator: %s is a GUC variable name */
1816 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1817 slot_idle_seconds, "idle_replication_slot_timeout",
1819 /* translator: %s is a GUC variable name */
1820 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1821 "idle_replication_slot_timeout");
1822 break;
1823 }
1824 case RS_INVAL_NONE:
1826 }
1827
1828 ereport(LOG,
1829 terminating ?
1830 errmsg("terminating process %d to release replication slot \"%s\"",
1831 pid, NameStr(slotname)) :
1832 errmsg("invalidating obsolete replication slot \"%s\"",
1833 NameStr(slotname)),
1834 errdetail_internal("%s", err_detail.data),
1835 err_hint.len ? errhint("%s", err_hint.data) : 0);
1836
1837 pfree(err_detail.data);
1838 pfree(err_hint.data);
1839}
1840
1841/*
1842 * Can we invalidate an idle replication slot?
1843 *
1844 * Idle timeout invalidation is allowed only when:
1845 *
1846 * 1. Idle timeout is set
1847 * 2. Slot has reserved WAL
1848 * 3. Slot is inactive
1849 * 4. The slot is not being synced from the primary while the server is in
1850 * recovery. This is because synced slots are always considered to be
1851 * inactive because they don't perform logical decoding to produce changes.
1852 */
1853static inline bool
1861
1862/*
1863 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1864 * becomes invalid among the given possible causes.
1865 *
1866 * This function sequentially checks all possible invalidation causes and
1867 * returns the first one for which the slot is eligible for invalidation.
1868 */
1871 XLogRecPtr oldestLSN, Oid dboid,
1872 TransactionId snapshotConflictHorizon,
1873 TimestampTz *inactive_since, TimestampTz now)
1874{
1876
1878 {
1879 XLogRecPtr restart_lsn = s->data.restart_lsn;
1880
1881 if (XLogRecPtrIsValid(restart_lsn) &&
1882 restart_lsn < oldestLSN)
1883 return RS_INVAL_WAL_REMOVED;
1884 }
1885
1887 {
1888 /* invalid DB oid signals a shared relation */
1889 if (SlotIsLogical(s) &&
1890 (dboid == InvalidOid || dboid == s->data.database))
1891 {
1892 TransactionId effective_xmin = s->effective_xmin;
1894
1895 if (TransactionIdIsValid(effective_xmin) &&
1896 TransactionIdPrecedesOrEquals(effective_xmin,
1897 snapshotConflictHorizon))
1898 return RS_INVAL_HORIZON;
1901 snapshotConflictHorizon))
1902 return RS_INVAL_HORIZON;
1903 }
1904 }
1905
1907 {
1908 if (SlotIsLogical(s))
1909 return RS_INVAL_WAL_LEVEL;
1910 }
1911
1913 {
1914 Assert(now > 0);
1915
1916 if (CanInvalidateIdleSlot(s))
1917 {
1918 /*
1919 * Simulate the invalidation due to idle_timeout to test the
1920 * timeout behavior promptly, without waiting for it to trigger
1921 * naturally.
1922 */
1923#ifdef USE_INJECTION_POINTS
1924 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1925 {
1926 *inactive_since = 0; /* since the beginning of time */
1927 return RS_INVAL_IDLE_TIMEOUT;
1928 }
1929#endif
1930
1931 /*
1932 * Check if the slot needs to be invalidated due to
1933 * idle_replication_slot_timeout GUC.
1934 */
1937 {
1938 *inactive_since = s->inactive_since;
1939 return RS_INVAL_IDLE_TIMEOUT;
1940 }
1941 }
1942 }
1943
1944 return RS_INVAL_NONE;
1945}
1946
1947/*
1948 * Helper for InvalidateObsoleteReplicationSlots
1949 *
1950 * Acquires the given slot and mark it invalid, if necessary and possible.
1951 *
1952 * Returns true if the slot was invalidated.
1953 *
1954 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1955 * interim (and in that case we're not holding the lock at return, otherwise
1956 * we are).
1957 *
1958 * This is inherently racy, because we release the LWLock
1959 * for syscalls, so caller must restart if we return true.
1960 */
1961static bool
1963 ReplicationSlot *s,
1965 Oid dboid, TransactionId snapshotConflictHorizon,
1966 bool *released_lock_out)
1967{
1968 int last_signaled_pid = 0;
1969 bool released_lock = false;
1970 bool invalidated = false;
1971 TimestampTz inactive_since = 0;
1972
1973 for (;;)
1974 {
1975 XLogRecPtr restart_lsn;
1976 NameData slotname;
1977 int active_pid = 0;
1979 TimestampTz now = 0;
1980 long slot_idle_secs = 0;
1981
1983
1984 if (!s->in_use)
1985 {
1986 if (released_lock)
1988 break;
1989 }
1990
1992 {
1993 /*
1994 * Assign the current time here to avoid system call overhead
1995 * while holding the spinlock in subsequent code.
1996 */
1998 }
1999
2000 /*
2001 * Check if the slot needs to be invalidated. If it needs to be
2002 * invalidated, and is not currently acquired, acquire it and mark it
2003 * as having been invalidated. We do this with the spinlock held to
2004 * avoid race conditions -- for example the restart_lsn could move
2005 * forward, or the slot could be dropped.
2006 */
2008
2009 restart_lsn = s->data.restart_lsn;
2010
2011 /* we do nothing if the slot is already invalid */
2012 if (s->data.invalidated == RS_INVAL_NONE)
2014 s, oldestLSN,
2015 dboid,
2016 snapshotConflictHorizon,
2017 &inactive_since,
2018 now);
2019
2020 /* if there's no invalidation, we're done */
2022 {
2024 if (released_lock)
2026 break;
2027 }
2028
2029 slotname = s->data.name;
2030 active_pid = s->active_pid;
2031
2032 /*
2033 * If the slot can be acquired, do so and mark it invalidated
2034 * immediately. Otherwise we'll signal the owning process, below, and
2035 * retry.
2036 *
2037 * Note: Unlike other slot attributes, slot's inactive_since can't be
2038 * changed until the acquired slot is released or the owning process
2039 * is terminated. So, the inactive slot can only be invalidated
2040 * immediately without being terminated.
2041 */
2042 if (active_pid == 0)
2043 {
2045 s->active_pid = MyProcPid;
2047
2048 /*
2049 * XXX: We should consider not overwriting restart_lsn and instead
2050 * just rely on .invalidated.
2051 */
2053 {
2056 }
2057
2058 /* Let caller know */
2059 invalidated = true;
2060 }
2061
2063
2064 /*
2065 * Calculate the idle time duration of the slot if slot is marked
2066 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2067 */
2069 {
2070 int slot_idle_usecs;
2071
2072 TimestampDifference(inactive_since, now, &slot_idle_secs,
2074 }
2075
2076 if (active_pid != 0)
2077 {
2078 /*
2079 * Prepare the sleep on the slot's condition variable before
2080 * releasing the lock, to close a possible race condition if the
2081 * slot is released before the sleep below.
2082 */
2084
2086 released_lock = true;
2087
2088 /*
2089 * Signal to terminate the process that owns the slot, if we
2090 * haven't already signalled it. (Avoidance of repeated
2091 * signalling is the only reason for there to be a loop in this
2092 * routine; otherwise we could rely on caller's restart loop.)
2093 *
2094 * There is the race condition that other process may own the slot
2095 * after its current owner process is terminated and before this
2096 * process owns it. To handle that, we signal only if the PID of
2097 * the owning process has changed from the previous time. (This
2098 * logic assumes that the same PID is not reused very quickly.)
2099 */
2100 if (last_signaled_pid != active_pid)
2101 {
2103 slotname, restart_lsn,
2104 oldestLSN, snapshotConflictHorizon,
2106
2107 if (MyBackendType == B_STARTUP)
2108 (void) SendProcSignal(active_pid,
2111 else
2112 (void) kill(active_pid, SIGTERM);
2113
2114 last_signaled_pid = active_pid;
2115 }
2116
2117 /* Wait until the slot is released. */
2120
2121 /*
2122 * Re-acquire lock and start over; we expect to invalidate the
2123 * slot next time (unless another process acquires the slot in the
2124 * meantime).
2125 *
2126 * Note: It is possible for a slot to advance its restart_lsn or
2127 * xmin values sufficiently between when we release the mutex and
2128 * when we recheck, moving from a conflicting state to a non
2129 * conflicting state. This is intentional and safe: if the slot
2130 * has caught up while we're busy here, the resources we were
2131 * concerned about (WAL segments or tuples) have not yet been
2132 * removed, and there's no reason to invalidate the slot.
2133 */
2135 continue;
2136 }
2137 else
2138 {
2139 /*
2140 * We hold the slot now and have already invalidated it; flush it
2141 * to ensure that state persists.
2142 *
2143 * Don't want to hold ReplicationSlotControlLock across file
2144 * system operations, so release it now but be sure to tell caller
2145 * to restart from scratch.
2146 */
2148 released_lock = true;
2149
2150 /* Make sure the invalidated state persists across server restart */
2154
2155 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2156 slotname, restart_lsn,
2157 oldestLSN, snapshotConflictHorizon,
2159
2160 /* done with this slot for now */
2161 break;
2162 }
2163 }
2164
2166
2168 return invalidated;
2169}
2170
2171/*
2172 * Invalidate slots that require resources about to be removed.
2173 *
2174 * Returns true when any slot have got invalidated.
2175 *
2176 * Whether a slot needs to be invalidated depends on the invalidation cause.
2177 * A slot is invalidated if it:
2178 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2179 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2180 * db; dboid may be InvalidOid for shared relations
2181 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2182 * logical.
2183 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2184 * "idle_replication_slot_timeout" duration.
2185 *
2186 * Note: This function attempts to invalidate the slot for multiple possible
2187 * causes in a single pass, minimizing redundant iterations. The "cause"
2188 * parameter can be a MASK representing one or more of the defined causes.
2189 *
2190 * If it invalidates the last logical slot in the cluster, it requests to
2191 * disable logical decoding.
2192 *
2193 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2194 */
2195bool
2197 XLogSegNo oldestSegno, Oid dboid,
2198 TransactionId snapshotConflictHorizon)
2199{
2201 bool invalidated = false;
2202 bool invalidated_logical = false;
2204
2205 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2208
2209 if (max_replication_slots == 0)
2210 return invalidated;
2211
2213
2214restart:
2217 for (int i = 0; i < max_replication_slots; i++)
2218 {
2220 bool released_lock = false;
2221
2222 if (!s->in_use)
2223 continue;
2224
2225 /* Prevent invalidation of logical slots during binary upgrade */
2227 {
2231
2232 continue;
2233 }
2234
2236 dboid, snapshotConflictHorizon,
2237 &released_lock))
2238 {
2240
2241 /* Remember we have invalidated a physical or logical slot */
2242 invalidated = true;
2243
2244 /*
2245 * Additionally, remember we have invalidated a logical slot as we
2246 * can request disabling logical decoding later.
2247 */
2248 if (SlotIsLogical(s))
2249 invalidated_logical = true;
2250 }
2251 else
2252 {
2253 /*
2254 * We need to check if the slot is invalidated here since
2255 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2256 * is already invalidated.
2257 */
2262 }
2263
2264 /* if the lock was released, start from scratch */
2265 if (released_lock)
2266 goto restart;
2267 }
2269
2270 /*
2271 * If any slots have been invalidated, recalculate the resource limits.
2272 */
2273 if (invalidated)
2274 {
2277 }
2278
2279 /*
2280 * Request the checkpointer to disable logical decoding if no valid
2281 * logical slots remain. If called by the checkpointer during a
2282 * checkpoint, only the request is initiated; actual deactivation is
2283 * deferred until after the checkpoint completes.
2284 */
2287
2288 return invalidated;
2289}
2290
2291/*
2292 * Flush all replication slots to disk.
2293 *
2294 * It is convenient to flush dirty replication slots at the time of checkpoint.
2295 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2296 * for which the confirmed_flush LSN has been updated since the last time it
2297 * was saved and flush them.
2298 */
2299void
2301{
2302 int i;
2303 bool last_saved_restart_lsn_updated = false;
2304
2305 elog(DEBUG1, "performing replication slot checkpoint");
2306
2307 /*
2308 * Prevent any slot from being created/dropped while we're active. As we
2309 * explicitly do *not* want to block iterating over replication_slots or
2310 * acquiring a slot we cannot take the control lock - but that's OK,
2311 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2312 * enough to guarantee that nobody can change the in_use bits on us.
2313 *
2314 * Additionally, acquiring the Allocation lock is necessary to serialize
2315 * the slot flush process with concurrent slot WAL reservation. This
2316 * ensures that the WAL position being reserved is either flushed to disk
2317 * or is beyond or equal to the redo pointer of the current checkpoint
2318 * (See ReplicationSlotReserveWal for details).
2319 */
2321
2322 for (i = 0; i < max_replication_slots; i++)
2323 {
2325 char path[MAXPGPATH];
2326
2327 if (!s->in_use)
2328 continue;
2329
2330 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2331 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2332
2333 /*
2334 * Slot's data is not flushed each time the confirmed_flush LSN is
2335 * updated as that could lead to frequent writes. However, we decide
2336 * to force a flush of all logical slot's data at the time of shutdown
2337 * if the confirmed_flush LSN is changed since we last flushed it to
2338 * disk. This helps in avoiding an unnecessary retreat of the
2339 * confirmed_flush LSN after restart.
2340 */
2341 if (is_shutdown && SlotIsLogical(s))
2342 {
2344
2345 if (s->data.invalidated == RS_INVAL_NONE &&
2347 {
2348 s->just_dirtied = true;
2349 s->dirty = true;
2350 }
2352 }
2353
2354 /*
2355 * Track if we're going to update slot's last_saved_restart_lsn. We
2356 * need this to know if we need to recompute the required LSN.
2357 */
2360
2361 SaveSlotToPath(s, path, LOG);
2362 }
2364
2365 /*
2366 * Recompute the required LSN if SaveSlotToPath() updated
2367 * last_saved_restart_lsn for any slot.
2368 */
2371}
2372
2373/*
2374 * Load all replication slots from disk into memory at server startup. This
2375 * needs to be run before we start crash recovery.
2376 */
2377void
2379{
2381 struct dirent *replication_de;
2382
2383 elog(DEBUG1, "starting up replication slots");
2384
2385 /* restore all slots by iterating over all on-disk entries */
2388 {
2389 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2391
2392 if (strcmp(replication_de->d_name, ".") == 0 ||
2393 strcmp(replication_de->d_name, "..") == 0)
2394 continue;
2395
2396 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2398
2399 /* we're only creating directories here, skip if it's not our's */
2401 continue;
2402
2403 /* we crashed while a slot was being setup or deleted, clean up */
2404 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2405 {
2406 if (!rmtree(path, true))
2407 {
2409 (errmsg("could not remove directory \"%s\"",
2410 path)));
2411 continue;
2412 }
2414 continue;
2415 }
2416
2417 /* looks like a slot in a normal state, restore */
2419 }
2421
2422 /* currently no slots exist, we're done. */
2423 if (max_replication_slots <= 0)
2424 return;
2425
2426 /* Now that we have recovered all the data, compute replication xmin */
2429}
2430
2431/* ----
2432 * Manipulation of on-disk state of replication slots
2433 *
2434 * NB: none of the routines below should take any notice whether a slot is the
2435 * current one or not, that's all handled a layer above.
2436 * ----
2437 */
2438static void
2440{
2441 char tmppath[MAXPGPATH];
2442 char path[MAXPGPATH];
2443 struct stat st;
2444
2445 /*
2446 * No need to take out the io_in_progress_lock, nobody else can see this
2447 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2448 * takes out the lock, if we'd take the lock here, we'd deadlock.
2449 */
2450
2451 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2452 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2453
2454 /*
2455 * It's just barely possible that some previous effort to create or drop a
2456 * slot with this name left a temp directory lying around. If that seems
2457 * to be the case, try to remove it. If the rmtree() fails, we'll error
2458 * out at the MakePGDirectory() below, so we don't bother checking
2459 * success.
2460 */
2461 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2462 rmtree(tmppath, true);
2463
2464 /* Create and fsync the temporary slot directory. */
2465 if (MakePGDirectory(tmppath) < 0)
2466 ereport(ERROR,
2468 errmsg("could not create directory \"%s\": %m",
2469 tmppath)));
2470 fsync_fname(tmppath, true);
2471
2472 /* Write the actual state file. */
2473 slot->dirty = true; /* signal that we really need to write */
2475
2476 /* Rename the directory into place. */
2477 if (rename(tmppath, path) != 0)
2478 ereport(ERROR,
2480 errmsg("could not rename file \"%s\" to \"%s\": %m",
2481 tmppath, path)));
2482
2483 /*
2484 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2485 * would persist after an OS crash or not - so, force a restart. The
2486 * restart would try to fsync this again till it works.
2487 */
2489
2490 fsync_fname(path, true);
2492
2494}
2495
2496/*
2497 * Shared functionality between saving and creating a replication slot.
2498 */
2499static void
2500SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2501{
2502 char tmppath[MAXPGPATH];
2503 char path[MAXPGPATH];
2504 int fd;
2506 bool was_dirty;
2507
2508 /* first check whether there's something to write out */
2509 SpinLockAcquire(&slot->mutex);
2510 was_dirty = slot->dirty;
2511 slot->just_dirtied = false;
2512 SpinLockRelease(&slot->mutex);
2513
2514 /* and don't do anything if there's nothing to write */
2515 if (!was_dirty)
2516 return;
2517
2519
2520 /* silence valgrind :( */
2521 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2522
2523 sprintf(tmppath, "%s/state.tmp", dir);
2524 sprintf(path, "%s/state", dir);
2525
2527 if (fd < 0)
2528 {
2529 /*
2530 * If not an ERROR, then release the lock before returning. In case
2531 * of an ERROR, the error recovery path automatically releases the
2532 * lock, but no harm in explicitly releasing even in that case. Note
2533 * that LWLockRelease() could affect errno.
2534 */
2535 int save_errno = errno;
2536
2538 errno = save_errno;
2539 ereport(elevel,
2541 errmsg("could not create file \"%s\": %m",
2542 tmppath)));
2543 return;
2544 }
2545
2546 cp.magic = SLOT_MAGIC;
2547 INIT_CRC32C(cp.checksum);
2548 cp.version = SLOT_VERSION;
2550
2551 SpinLockAcquire(&slot->mutex);
2552
2553 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2554
2555 SpinLockRelease(&slot->mutex);
2556
2557 COMP_CRC32C(cp.checksum,
2560 FIN_CRC32C(cp.checksum);
2561
2562 errno = 0;
2564 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2565 {
2566 int save_errno = errno;
2567
2570 unlink(tmppath);
2572
2573 /* if write didn't set errno, assume problem is no disk space */
2575 ereport(elevel,
2577 errmsg("could not write to file \"%s\": %m",
2578 tmppath)));
2579 return;
2580 }
2582
2583 /* fsync the temporary file */
2585 if (pg_fsync(fd) != 0)
2586 {
2587 int save_errno = errno;
2588
2591 unlink(tmppath);
2593
2594 errno = save_errno;
2595 ereport(elevel,
2597 errmsg("could not fsync file \"%s\": %m",
2598 tmppath)));
2599 return;
2600 }
2602
2603 if (CloseTransientFile(fd) != 0)
2604 {
2605 int save_errno = errno;
2606
2607 unlink(tmppath);
2609
2610 errno = save_errno;
2611 ereport(elevel,
2613 errmsg("could not close file \"%s\": %m",
2614 tmppath)));
2615 return;
2616 }
2617
2618 /* rename to permanent file, fsync file and directory */
2619 if (rename(tmppath, path) != 0)
2620 {
2621 int save_errno = errno;
2622
2623 unlink(tmppath);
2625
2626 errno = save_errno;
2627 ereport(elevel,
2629 errmsg("could not rename file \"%s\" to \"%s\": %m",
2630 tmppath, path)));
2631 return;
2632 }
2633
2634 /*
2635 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2636 */
2638
2639 fsync_fname(path, false);
2640 fsync_fname(dir, true);
2642
2644
2645 /*
2646 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2647 * already and remember the confirmed_flush LSN value.
2648 */
2649 SpinLockAcquire(&slot->mutex);
2650 if (!slot->just_dirtied)
2651 slot->dirty = false;
2652 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2653 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2654 SpinLockRelease(&slot->mutex);
2655
2657}
2658
2659/*
2660 * Load a single slot from disk into memory.
2661 */
2662static void
2664{
2666 int i;
2667 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2668 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2669 int fd;
2670 bool restored = false;
2671 int readBytes;
2672 pg_crc32c checksum;
2673 TimestampTz now = 0;
2674
2675 /* no need to lock here, no concurrent access allowed yet */
2676
2677 /* delete temp file if it exists */
2678 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2679 sprintf(path, "%s/state.tmp", slotdir);
2680 if (unlink(path) < 0 && errno != ENOENT)
2681 ereport(PANIC,
2683 errmsg("could not remove file \"%s\": %m", path)));
2684
2685 sprintf(path, "%s/state", slotdir);
2686
2687 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2688
2689 /* on some operating systems fsyncing a file requires O_RDWR */
2691
2692 /*
2693 * We do not need to handle this as we are rename()ing the directory into
2694 * place only after we fsync()ed the state file.
2695 */
2696 if (fd < 0)
2697 ereport(PANIC,
2699 errmsg("could not open file \"%s\": %m", path)));
2700
2701 /*
2702 * Sync state file before we're reading from it. We might have crashed
2703 * while it wasn't synced yet and we shouldn't continue on that basis.
2704 */
2706 if (pg_fsync(fd) != 0)
2707 ereport(PANIC,
2709 errmsg("could not fsync file \"%s\": %m",
2710 path)));
2712
2713 /* Also sync the parent directory */
2715 fsync_fname(slotdir, true);
2717
2718 /* read part of statefile that's guaranteed to be version independent */
2723 {
2724 if (readBytes < 0)
2725 ereport(PANIC,
2727 errmsg("could not read file \"%s\": %m", path)));
2728 else
2729 ereport(PANIC,
2731 errmsg("could not read file \"%s\": read %d of %zu",
2732 path, readBytes,
2734 }
2735
2736 /* verify magic */
2737 if (cp.magic != SLOT_MAGIC)
2738 ereport(PANIC,
2740 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2741 path, cp.magic, SLOT_MAGIC)));
2742
2743 /* verify version */
2744 if (cp.version != SLOT_VERSION)
2745 ereport(PANIC,
2747 errmsg("replication slot file \"%s\" has unsupported version %u",
2748 path, cp.version)));
2749
2750 /* boundary check on length */
2751 if (cp.length != ReplicationSlotOnDiskV2Size)
2752 ereport(PANIC,
2754 errmsg("replication slot file \"%s\" has corrupted length %u",
2755 path, cp.length)));
2756
2757 /* Now that we know the size, read the entire file */
2759 readBytes = read(fd,
2761 cp.length);
2763 if (readBytes != cp.length)
2764 {
2765 if (readBytes < 0)
2766 ereport(PANIC,
2768 errmsg("could not read file \"%s\": %m", path)));
2769 else
2770 ereport(PANIC,
2772 errmsg("could not read file \"%s\": read %d of %zu",
2773 path, readBytes, (Size) cp.length)));
2774 }
2775
2776 if (CloseTransientFile(fd) != 0)
2777 ereport(PANIC,
2779 errmsg("could not close file \"%s\": %m", path)));
2780
2781 /* now verify the CRC */
2782 INIT_CRC32C(checksum);
2783 COMP_CRC32C(checksum,
2786 FIN_CRC32C(checksum);
2787
2788 if (!EQ_CRC32C(checksum, cp.checksum))
2789 ereport(PANIC,
2790 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2791 path, checksum, cp.checksum)));
2792
2793 /*
2794 * If we crashed with an ephemeral slot active, don't restore but delete
2795 * it.
2796 */
2797 if (cp.slotdata.persistency != RS_PERSISTENT)
2798 {
2799 if (!rmtree(slotdir, true))
2800 {
2802 (errmsg("could not remove directory \"%s\"",
2803 slotdir)));
2804 }
2806 return;
2807 }
2808
2809 /*
2810 * Verify that requirements for the specific slot type are met. That's
2811 * important because if these aren't met we're not guaranteed to retain
2812 * all the necessary resources for the slot.
2813 *
2814 * NB: We have to do so *after* the above checks for ephemeral slots,
2815 * because otherwise a slot that shouldn't exist anymore could prevent
2816 * restarts.
2817 *
2818 * NB: Changing the requirements here also requires adapting
2819 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2820 */
2821 if (cp.slotdata.database != InvalidOid)
2822 {
2824 ereport(FATAL,
2826 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2827 NameStr(cp.slotdata.name)),
2828 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2829
2830 /*
2831 * In standby mode, the hot standby must be enabled. This check is
2832 * necessary to ensure logical slots are invalidated when they become
2833 * incompatible due to insufficient wal_level. Otherwise, if the
2834 * primary reduces effective_wal_level < logical while hot standby is
2835 * disabled, primary disable logical decoding while hot standby is
2836 * disabled, logical slots would remain valid even after promotion.
2837 */
2839 ereport(FATAL,
2841 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2842 NameStr(cp.slotdata.name)),
2843 errhint("Change \"hot_standby\" to be \"on\".")));
2844 }
2845 else if (wal_level < WAL_LEVEL_REPLICA)
2846 ereport(FATAL,
2848 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2849 NameStr(cp.slotdata.name)),
2850 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2851
2852 /* nothing can be active yet, don't lock anything */
2853 for (i = 0; i < max_replication_slots; i++)
2854 {
2855 ReplicationSlot *slot;
2856
2858
2859 if (slot->in_use)
2860 continue;
2861
2862 /* restore the entire set of persistent data */
2863 memcpy(&slot->data, &cp.slotdata,
2865
2866 /* initialize in memory state */
2867 slot->effective_xmin = cp.slotdata.xmin;
2868 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2869 slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2870 slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2871
2876
2877 slot->in_use = true;
2878 slot->active_pid = 0;
2879
2880 /*
2881 * Set the time since the slot has become inactive after loading the
2882 * slot from the disk into memory. Whoever acquires the slot i.e.
2883 * makes the slot active will reset it. Use the same inactive_since
2884 * time for all the slots.
2885 */
2886 if (now == 0)
2888
2890
2891 restored = true;
2892 break;
2893 }
2894
2895 if (!restored)
2896 ereport(FATAL,
2897 (errmsg("too many replication slots active before shutdown"),
2898 errhint("Increase \"max_replication_slots\" and try again.")));
2899}
2900
2901/*
2902 * Maps an invalidation reason for a replication slot to
2903 * ReplicationSlotInvalidationCause.
2904 */
2906GetSlotInvalidationCause(const char *cause_name)
2907{
2908 Assert(cause_name);
2909
2910 /* Search lookup table for the cause having this name */
2911 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2912 {
2913 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2915 }
2916
2917 Assert(false);
2918 return RS_INVAL_NONE; /* to keep compiler quiet */
2919}
2920
2921/*
2922 * Maps a ReplicationSlotInvalidationCause to the invalidation
2923 * reason for a replication slot.
2924 */
2925const char *
2927{
2928 /* Search lookup table for the name of this cause */
2929 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2930 {
2931 if (SlotInvalidationCauses[i].cause == cause)
2933 }
2934
2935 Assert(false);
2936 return "none"; /* to keep compiler quiet */
2937}
2938
2939/*
2940 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2941 *
2942 * The rawname will be parsed, and the result will be saved into *elemlist.
2943 */
2944static bool
2946{
2947 /* Verify syntax and parse string into a list of identifiers */
2949 {
2950 GUC_check_errdetail("List syntax is invalid.");
2951 return false;
2952 }
2953
2954 /* Iterate the list to validate each slot name */
2955 foreach_ptr(char, name, *elemlist)
2956 {
2957 int err_code;
2958 char *err_msg = NULL;
2959 char *err_hint = NULL;
2960
2962 &err_msg, &err_hint))
2963 {
2965 GUC_check_errdetail("%s", err_msg);
2966 if (err_hint != NULL)
2968 return false;
2969 }
2970 }
2971
2972 return true;
2973}
2974
2975/*
2976 * GUC check_hook for synchronized_standby_slots
2977 */
2978bool
2980{
2981 char *rawname;
2982 char *ptr;
2983 List *elemlist;
2984 int size;
2985 bool ok;
2987
2988 if ((*newval)[0] == '\0')
2989 return true;
2990
2991 /* Need a modifiable copy of the GUC string */
2993
2994 /* Now verify if the specified slots exist and have correct type */
2996
2997 if (!ok || elemlist == NIL)
2998 {
2999 pfree(rawname);
3001 return ok;
3002 }
3003
3004 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3005 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3006 foreach_ptr(char, slot_name, elemlist)
3007 size += strlen(slot_name) + 1;
3008
3009 /* GUC extra value must be guc_malloc'd, not palloc'd */
3010 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3011 if (!config)
3012 return false;
3013
3014 /* Transform the data into SyncStandbySlotsConfigData */
3015 config->nslotnames = list_length(elemlist);
3016
3017 ptr = config->slot_names;
3018 foreach_ptr(char, slot_name, elemlist)
3019 {
3020 strcpy(ptr, slot_name);
3021 ptr += strlen(slot_name) + 1;
3022 }
3023
3024 *extra = config;
3025
3026 pfree(rawname);
3028 return true;
3029}
3030
3031/*
3032 * GUC assign_hook for synchronized_standby_slots
3033 */
3034void
3036{
3037 /*
3038 * The standby slots may have changed, so we must recompute the oldest
3039 * LSN.
3040 */
3042
3044}
3045
3046/*
3047 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3048 */
3049bool
3050SlotExistsInSyncStandbySlots(const char *slot_name)
3051{
3052 const char *standby_slot_name;
3053
3054 /* Return false if there is no value in synchronized_standby_slots */
3056 return false;
3057
3058 /*
3059 * XXX: We are not expecting this list to be long so a linear search
3060 * shouldn't hurt but if that turns out not to be true then we can cache
3061 * this information for each WalSender as well.
3062 */
3064 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3065 {
3066 if (strcmp(standby_slot_name, slot_name) == 0)
3067 return true;
3068
3070 }
3071
3072 return false;
3073}
3074
3075/*
3076 * Return true if the slots specified in synchronized_standby_slots have caught up to
3077 * the given WAL location, false otherwise.
3078 *
3079 * The elevel parameter specifies the error level used for logging messages
3080 * related to slots that do not exist, are invalidated, or are inactive.
3081 */
3082bool
3084{
3085 const char *name;
3086 int caught_up_slot_num = 0;
3088
3089 /*
3090 * Don't need to wait for the standbys to catch up if there is no value in
3091 * synchronized_standby_slots.
3092 */
3094 return true;
3095
3096 /*
3097 * Don't need to wait for the standbys to catch up if we are on a standby
3098 * server, since we do not support syncing slots to cascading standbys.
3099 */
3100 if (RecoveryInProgress())
3101 return true;
3102
3103 /*
3104 * Don't need to wait for the standbys to catch up if they are already
3105 * beyond the specified WAL location.
3106 */
3109 return true;
3110
3111 /*
3112 * To prevent concurrent slot dropping and creation while filtering the
3113 * slots, take the ReplicationSlotControlLock outside of the loop.
3114 */
3116
3118 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3119 {
3120 XLogRecPtr restart_lsn;
3121 bool invalidated;
3122 bool inactive;
3123 ReplicationSlot *slot;
3124
3125 slot = SearchNamedReplicationSlot(name, false);
3126
3127 /*
3128 * If a slot name provided in synchronized_standby_slots does not
3129 * exist, report a message and exit the loop.
3130 */
3131 if (!slot)
3132 {
3133 ereport(elevel,
3135 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3136 name, "synchronized_standby_slots"),
3137 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3138 name),
3139 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3140 name, "synchronized_standby_slots"));
3141 break;
3142 }
3143
3144 /* Same as above: if a slot is not physical, exit the loop. */
3145 if (SlotIsLogical(slot))
3146 {
3147 ereport(elevel,
3149 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3150 name, "synchronized_standby_slots"),
3151 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3152 name),
3153 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3154 name, "synchronized_standby_slots"));
3155 break;
3156 }
3157
3158 SpinLockAcquire(&slot->mutex);
3159 restart_lsn = slot->data.restart_lsn;
3160 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3161 inactive = slot->active_pid == 0;
3162 SpinLockRelease(&slot->mutex);
3163
3164 if (invalidated)
3165 {
3166 /* Specified physical slot has been invalidated */
3167 ereport(elevel,
3169 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3170 name, "synchronized_standby_slots"),
3171 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3172 name),
3173 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3174 name, "synchronized_standby_slots"));
3175 break;
3176 }
3177
3178 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3179 {
3180 /* Log a message if no active_pid for this physical slot */
3181 if (inactive)
3182 ereport(elevel,
3184 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3185 name, "synchronized_standby_slots"),
3186 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3187 name),
3188 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3189 name, "synchronized_standby_slots"));
3190
3191 /* Continue if the current slot hasn't caught up. */
3192 break;
3193 }
3194
3195 Assert(restart_lsn >= wait_for_lsn);
3196
3198 min_restart_lsn > restart_lsn)
3199 min_restart_lsn = restart_lsn;
3200
3202
3203 name += strlen(name) + 1;
3204 }
3205
3207
3208 /*
3209 * Return false if not all the standbys have caught up to the specified
3210 * WAL location.
3211 */
3213 return false;
3214
3215 /* The ss_oldest_flush_lsn must not retreat. */
3218
3220
3221 return true;
3222}
3223
3224/*
3225 * Wait for physical standbys to confirm receiving the given lsn.
3226 *
3227 * Used by logical decoding SQL functions. It waits for physical standbys
3228 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3229 */
3230void
3232{
3233 /*
3234 * Don't need to wait for the standby to catch up if the current acquired
3235 * slot is not a logical failover slot, or there is no value in
3236 * synchronized_standby_slots.
3237 */
3239 return;
3240
3242
3243 for (;;)
3244 {
3246
3248 {
3249 ConfigReloadPending = false;
3251 }
3252
3253 /* Exit if done waiting for every slot. */
3255 break;
3256
3257 /*
3258 * Wait for the slots in the synchronized_standby_slots to catch up,
3259 * but use a timeout (1s) so we can also check if the
3260 * synchronized_standby_slots has been changed.
3261 */
3264 }
3265
3267}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition timestamp.c:1721
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition timestamp.c:1795
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define NameStr(name)
Definition c.h:775
#define ngettext(s, p, n)
Definition c.h:1170
#define Assert(condition)
Definition c.h:883
#define PG_BINARY
Definition c.h:1281
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:490
uint64_t uint64
Definition c.h:557
#define pg_unreachable()
Definition c.h:351
uint32_t uint32
Definition c.h:556
#define lengthof(array)
Definition c.h:813
#define MemSet(start, val, len)
Definition c.h:1023
#define StaticAssertDecl(condition, errmessage)
Definition c.h:952
uint32 TransactionId
Definition c.h:676
size_t Size
Definition c.h:629
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
int errmsg_internal(const char *fmt,...)
Definition elog.c:1170
int errdetail_internal(const char *fmt,...)
Definition elog.c:1243
int errcode_for_file_access(void)
Definition elog.c:886
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint_internal(const char *fmt,...)
Definition elog.c:1352
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define _(x)
Definition elog.c:91
#define LOG
Definition elog.h:31
#define FATAL
Definition elog.h:41
#define WARNING
Definition elog.h:36
#define PANIC
Definition elog.h:42
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int MakePGDirectory(const char *directoryName)
Definition fd.c:3959
int FreeDir(DIR *dir)
Definition fd.c:3005
int CloseTransientFile(int fd)
Definition fd.c:2851
void fsync_fname(const char *fname, bool isdir)
Definition fd.c:753
DIR * AllocateDir(const char *dirname)
Definition fd.c:2887
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition fd.c:2953
int pg_fsync(int fd)
Definition fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition file_utils.c:547
PGFileType
Definition file_utils.h:19
@ PGFILETYPE_DIR
Definition file_utils.h:23
@ PGFILETYPE_ERROR
Definition file_utils.h:20
bool IsBinaryUpgrade
Definition globals.c:121
int MyProcPid
Definition globals.c:47
bool IsUnderPostmaster
Definition globals.c:120
Oid MyDatabaseId
Definition globals.c:94
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6628
void * guc_malloc(int elevel, size_t size)
Definition guc.c:636
#define newval
#define GUC_check_errdetail
Definition guc.h:505
GucSource
Definition guc.h:112
@ PGC_SIGHUP
Definition guc.h:75
#define GUC_check_errhint
Definition guc.h:509
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
int i
Definition isn.c:77
bool IsLogicalLauncher(void)
Definition launcher.c:1587
void list_free(List *list)
Definition list.c:1546
void RequestDisableLogicalDecoding(void)
Definition logicalctl.c:433
bool LWLockHeldByMe(LWLock *lock)
Definition lwlock.c:1911
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1955
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
@ B_STARTUP
Definition miscadmin.h:365
#define END_CRIT_SECTION()
Definition miscadmin.h:152
Oid GetUserId(void)
Definition miscinit.c:469
BackendType MyBackendType
Definition miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition miscinit.c:688
void namestrcpy(Name name, const char *str)
Definition name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static rewind_source * source
Definition pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition procarray.c:3922
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition procsignal.h:46
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition slot.c:573
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:620
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition slot.c:1962
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition slot.c:113
char * synchronized_standby_slots
Definition slot.c:164
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition slot.c:3035
#define ReplicationSlotOnDiskChecksummedSize
Definition slot.c:135
void CheckPointReplicationSlots(bool is_shutdown)
Definition slot.c:2300
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:378
int idle_replication_slot_timeout_secs
Definition slot.c:158
void ReplicationSlotDropAcquired(void)
Definition slot.c:1031
void ReplicationSlotMarkDirty(void)
Definition slot.c:1173
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition slot.c:1870
void ReplicationSlotReserveWal(void)
Definition slot.c:1693
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition slot.c:1446
static XLogRecPtr ss_oldest_flush_lsn
Definition slot.c:173
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition slot.c:311
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition slot.c:1507
#define ReplicationSlotOnDiskNotChecksummedSize
Definition slot.c:132
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition slot.c:1367
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition slot.c:2906
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1215
static void RestoreSlotFromDisk(const char *name)
Definition slot.c:2663
void ReplicationSlotPersist(void)
Definition slot.c:1190
bool CheckLogicalSlotExists(void)
Definition slot.c:1612
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition slot.c:1772
ReplicationSlot * MyReplicationSlot
Definition slot.c:148
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition slot.c:2500
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:909
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3050
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition slot.c:2945
void ReplicationSlotSave(void)
Definition slot.c:1155
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:540
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition slot.c:2439
#define ReplicationSlotOnDiskV2Size
Definition slot.c:138
void CheckSlotPermissions(void)
Definition slot.c:1676
bool ReplicationSlotName(int index, Name name)
Definition slot.c:589
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition slot.c:2979
void ReplicationSlotsShmemInit(void)
Definition slot.c:206
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:266
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:949
void ReplicationSlotRelease(void)
Definition slot.c:758
int max_replication_slots
Definition slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:145
#define SLOT_VERSION
Definition slot.c:142
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition slot.c:3231
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3083
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1297
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:857
void ReplicationSlotInitialize(void)
Definition slot.c:241
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition slot.c:1048
void StartupReplicationSlots(void)
Definition slot.c:2378
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition slot.c:1854
void CheckSlotRequirements(void)
Definition slot.c:1654
#define SLOT_MAGIC
Definition slot.c:141
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition slot.c:2196
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition slot.c:167
#define ReplicationSlotOnDiskConstantSize
Definition slot.c:129
Size ReplicationSlotsShmemSize(void)
Definition slot.c:188
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2926
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition slot.c:250
static bool IsSlotForConflictCheck(const char *name)
Definition slot.c:361
#define CONFLICT_DETECTION_SLOT
Definition slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition slot.h:72
ReplicationSlotPersistency
Definition slot.h:44
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
#define SlotIsPhysical(slot)
Definition slot.h:284
#define PG_REPLSLOT_DIR
Definition slot.h:21
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition slot.h:68
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:285
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition slot.h:303
@ SS_SKIP_NONE
Definition slot.h:82
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1882
#define SpinLockInit(lock)
Definition spin.h:57
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
PGPROC * MyProc
Definition proc.c:67
PROC_HDR * ProcGlobal
Definition proc.c:79
XLogRecPtr LogStandbySnapshot(void)
Definition standby.c:1282
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition dirent.c:26
Definition pg_list.h:54
uint8 statusFlags
Definition proc.h:265
int pgxactoff
Definition proc.h:201
uint8 * statusFlags
Definition proc.h:409
ReplicationSlot replication_slots[1]
Definition slot.h:296
ReplicationSlotPersistentData slotdata
Definition slot.c:83
pg_crc32c checksum
Definition slot.c:72
ReplicationSlotPersistency persistency
Definition slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition slot.h:226
TransactionId effective_catalog_xmin
Definition slot.h:207
slock_t mutex
Definition slot.h:183
XLogRecPtr candidate_restart_valid
Definition slot.h:227
XLogRecPtr last_saved_confirmed_flush
Definition slot.h:235
pid_t active_pid
Definition slot.h:189
SlotSyncSkipReason slotsync_skip_reason
Definition slot.h:281
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:206
bool just_dirtied
Definition slot.h:192
XLogRecPtr last_saved_restart_lsn
Definition slot.h:268
XLogRecPtr candidate_restart_lsn
Definition slot.h:228
LWLock io_in_progress_lock
Definition slot.h:213
ConditionVariable active_cv
Definition slot.h:216
TransactionId candidate_catalog_xmin
Definition slot.h:225
ReplicationSlotPersistentData data
Definition slot.h:210
TimestampTz inactive_since
Definition slot.h:242
const char * cause_name
Definition slot.c:110
ReplicationSlotInvalidationCause cause
Definition slot.c:109
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition slot.c:101
ConditionVariable wal_confirm_rcv_cv
Definition type.h:96
Definition c.h:770
unsigned short st_mode
Definition win32_port.h:258
#define InvalidTransactionId
Definition transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdIsValid(xid)
Definition transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2730
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
const char * name
bool am_walsender
Definition walsender.c:123
bool log_replication_commands
Definition walsender.c:133
WalSndCtlData * WalSndCtl
Definition walsender.c:117
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define kill(pid, sig)
Definition win32_port.h:490
bool RecoveryInProgress(void)
Definition xlog.c:6461
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3796
bool EnableHotStandby
Definition xlog.c:124
XLogRecPtr GetRedoRecPtr(void)
Definition xlog.c:6564
int wal_level
Definition xlog.c:134
int wal_segment_size
Definition xlog.c:146
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition xlog.c:2670
XLogRecPtr GetXLogInsertRecPtr(void)
Definition xlog.c:9596
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2784
@ WAL_LEVEL_REPLICA
Definition xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint64 XLogSegNo
Definition xlogdefs.h:52
bool StandbyMode
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)