PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "utils/builtins.h"
59#include "utils/guc_hooks.h"
61#include "utils/varlena.h"
62
63/*
64 * Replication slot on-disk data structure.
65 */
67{
68 /* first part of this struct needs to be version independent */
69
70 /* data not covered by checksum */
73
74 /* data covered by checksum */
77
78 /*
79 * The actual data in the slot that follows can differ based on the above
80 * 'version'.
81 */
82
85
86/*
87 * Struct for the configuration of synchronized_standby_slots.
88 *
89 * Note: this must be a flat representation that can be held in a single chunk
90 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 * synchronized_standby_slots GUC.
92 */
93typedef struct
94{
95 /* Number of slot names in the slot_names[] */
97
98 /*
99 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 */
101 char slot_names[FLEXIBLE_ARRAY_MEMBER];
103
104/*
105 * Lookup table for slot invalidation causes.
106 */
108{
110 const char *cause_name;
112
114 {RS_INVAL_NONE, "none"},
115 {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 {RS_INVAL_HORIZON, "rows_removed"},
117 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119};
120
121/*
122 * Ensure that the lookup table is up-to-date with the enums defined in
123 * ReplicationSlotInvalidationCause.
124 */
126 "array length mismatch");
127
128/* size of version independent data */
129#define ReplicationSlotOnDiskConstantSize \
130 offsetof(ReplicationSlotOnDisk, slotdata)
131/* size of the part of the slot not covered by the checksum */
132#define ReplicationSlotOnDiskNotChecksummedSize \
133 offsetof(ReplicationSlotOnDisk, version)
134/* size of the part covered by the checksum */
135#define ReplicationSlotOnDiskChecksummedSize \
136 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137/* size of the slot data that is version dependent */
138#define ReplicationSlotOnDiskV2Size \
139 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140
141#define SLOT_MAGIC 0x1051CA1 /* format identifier */
142#define SLOT_VERSION 5 /* version for new files */
143
144/* Control array for replication slot management */
146
147/* My backend's replication slot in the shared memory array */
149
150/* GUC variables */
151int max_replication_slots = 10; /* the maximum number of replication
152 * slots */
153
154/*
155 * Invalidate replication slots that have remained idle longer than this
156 * duration; '0' disables it.
157 */
159
160/*
161 * This GUC lists streaming replication standby server slot names that
162 * logical WAL sender processes will wait for.
163 */
165
166/* This is the parsed and cached configuration for synchronized_standby_slots */
168
169/*
170 * Oldest LSN that has been confirmed to be flushed to the standbys
171 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 */
174
175static void ReplicationSlotShmemExit(int code, Datum arg);
176static bool IsSlotForConflictCheck(const char *name);
177static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178
179/* internal persistency functions */
180static void RestoreSlotFromDisk(const char *name);
181static void CreateSlotOnDisk(ReplicationSlot *slot);
182static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183
184/*
185 * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 */
187Size
189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
201
202/*
203 * Allocate and initialize shared memory for replication slots.
204 */
205void
207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
231 LWTRANCHE_REPLICATION_SLOT_IO);
233 }
234 }
235}
236
237/*
238 * Register the callback for replication slot cleanup and releasing.
239 */
240void
242{
244}
245
246/*
247 * Release and cleanup replication slots.
248 */
249static void
251{
252 /* Make sure active replication slots are released */
253 if (MyReplicationSlot != NULL)
255
256 /* Also cleanup all the temporary slots. */
258}
259
260/*
261 * Check whether the passed slot name is valid and report errors at elevel.
262 *
263 * See comments for ReplicationSlotValidateNameInternal().
264 */
265bool
266ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 int elevel)
268{
269 int err_code;
270 char *err_msg = NULL;
271 char *err_hint = NULL;
272
273 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 &err_code, &err_msg, &err_hint))
275 {
276 /*
277 * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 * and errhint(), since the messages from
279 * ReplicationSlotValidateNameInternal() are already translated. This
280 * avoids double translation.
281 */
282 ereport(elevel,
283 errcode(err_code),
284 errmsg_internal("%s", err_msg),
285 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286
287 pfree(err_msg);
288 if (err_hint != NULL)
289 pfree(err_hint);
290 return false;
291 }
292
293 return true;
294}
295
296/*
297 * Check whether the passed slot name is valid.
298 *
299 * An error will be reported for a reserved replication slot name if
300 * allow_reserved_name is set to false.
301 *
302 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 * the name to be used as a directory name on every supported OS.
304 *
305 * Returns true if the slot name is valid. Otherwise, returns false and stores
306 * the error code, error message, and optional hint in err_code, err_msg, and
307 * err_hint, respectively. The caller is responsible for freeing err_msg and
308 * err_hint, which are palloc'd.
309 */
310bool
311ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 int *err_code, char **err_msg, char **err_hint)
313{
314 const char *cp;
315
316 if (strlen(name) == 0)
317 {
318 *err_code = ERRCODE_INVALID_NAME;
319 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 *err_hint = NULL;
321 return false;
322 }
323
324 if (strlen(name) >= NAMEDATALEN)
325 {
326 *err_code = ERRCODE_NAME_TOO_LONG;
327 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 *err_hint = NULL;
329 return false;
330 }
331
332 for (cp = name; *cp; cp++)
333 {
334 if (!((*cp >= 'a' && *cp <= 'z')
335 || (*cp >= '0' && *cp <= '9')
336 || (*cp == '_')))
337 {
338 *err_code = ERRCODE_INVALID_NAME;
339 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 return false;
342 }
343 }
344
345 if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 {
347 *err_code = ERRCODE_RESERVED_NAME;
348 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 return false;
352 }
353
354 return true;
355}
356
357/*
358 * Return true if the replication slot name is "pg_conflict_detection".
359 */
360static bool
362{
363 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364}
365
366/*
367 * Create a new replication slot and mark it as used by this backend.
368 *
369 * name: Name of the slot
370 * db_specific: logical decoding is db specific; if the slot is going to
371 * be used for that pass true, otherwise false.
372 * two_phase: Allows decoding of prepared transactions. We allow this option
373 * to be enabled only at the slot creation time. If we allow this option
374 * to be changed during decoding then it is quite possible that we skip
375 * prepare first time because this option was not enabled. Now next time
376 * during getting changes, if the two_phase option is enabled it can skip
377 * prepare because by that time start decoding point has been moved. So the
378 * user will only get commit prepared.
379 * failover: If enabled, allows the slot to be synced to standbys so
380 * that logical replication can be resumed after failover.
381 * synced: True if the slot is synchronized from the primary server.
382 */
383void
384ReplicationSlotCreate(const char *name, bool db_specific,
385 ReplicationSlotPersistency persistency,
386 bool two_phase, bool failover, bool synced)
387{
388 ReplicationSlot *slot = NULL;
389 int i;
390
391 Assert(MyReplicationSlot == NULL);
392
393 /*
394 * The logical launcher or pg_upgrade may create or migrate an internal
395 * slot, so using a reserved name is allowed in these cases.
396 */
398 ERROR);
399
400 if (failover)
401 {
402 /*
403 * Do not allow users to create the failover enabled slots on the
404 * standby as we do not support sync to the cascading standby.
405 *
406 * However, failover enabled slots can be created during slot
407 * synchronization because we need to retain the same values as the
408 * remote slot.
409 */
412 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 errmsg("cannot enable failover for a replication slot created on the standby"));
414
415 /*
416 * Do not allow users to create failover enabled temporary slots,
417 * because temporary slots will not be synced to the standby.
418 *
419 * However, failover enabled temporary slots can be created during
420 * slot synchronization. See the comments atop slotsync.c for details.
421 */
422 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
424 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 errmsg("cannot enable failover for a temporary replication slot"));
426 }
427
428 /*
429 * If some other backend ran this code concurrently with us, we'd likely
430 * both allocate the same slot, and that would be bad. We'd also be at
431 * risk of missing a name collision. Also, we don't want to try to create
432 * a new slot while somebody's busy cleaning up an old one, because we
433 * might both be monkeying with the same directory.
434 */
435 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436
437 /*
438 * Check for name collision, and identify an allocatable slot. We need to
439 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 * else can change the in_use flags while we're looking at them.
441 */
442 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 for (i = 0; i < max_replication_slots; i++)
444 {
446
447 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
450 errmsg("replication slot \"%s\" already exists", name)));
451 if (!s->in_use && slot == NULL)
452 slot = s;
453 }
454 LWLockRelease(ReplicationSlotControlLock);
455
456 /* If all slots are in use, we're out of luck. */
457 if (slot == NULL)
459 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 errmsg("all replication slots are in use"),
461 errhint("Free one or increase \"max_replication_slots\".")));
462
463 /*
464 * Since this slot is not in use, nobody should be looking at any part of
465 * it other than the in_use field unless they're trying to allocate it.
466 * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 * be doing that. So it's safe to initialize the slot.
468 */
469 Assert(!slot->in_use);
470 Assert(slot->active_pid == 0);
471
472 /* first initialize persistent data */
473 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 namestrcpy(&slot->data.name, name);
475 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 slot->data.persistency = persistency;
477 slot->data.two_phase = two_phase;
479 slot->data.failover = failover;
480 slot->data.synced = synced;
481
482 /* and then data only present in shared memory */
483 slot->just_dirtied = false;
484 slot->dirty = false;
493 slot->inactive_since = 0;
494
495 /*
496 * Create the slot on disk. We haven't actually marked the slot allocated
497 * yet, so no special cleanup is required if this errors out.
498 */
499 CreateSlotOnDisk(slot);
500
501 /*
502 * We need to briefly prevent any other backend from iterating over the
503 * slots while we flip the in_use flag. We also need to set the active
504 * flag while holding the ControlLock as otherwise a concurrent
505 * ReplicationSlotAcquire() could acquire the slot as well.
506 */
507 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
508
509 slot->in_use = true;
510
511 /* We can now mark the slot active, and that makes it our slot. */
512 SpinLockAcquire(&slot->mutex);
513 Assert(slot->active_pid == 0);
514 slot->active_pid = MyProcPid;
515 SpinLockRelease(&slot->mutex);
516 MyReplicationSlot = slot;
517
518 LWLockRelease(ReplicationSlotControlLock);
519
520 /*
521 * Create statistics entry for the new logical slot. We don't collect any
522 * stats for physical slots, so no need to create an entry for the same.
523 * See ReplicationSlotDropPtr for why we need to do this before releasing
524 * ReplicationSlotAllocationLock.
525 */
526 if (SlotIsLogical(slot))
528
529 /*
530 * Now that the slot has been marked as in_use and active, it's safe to
531 * let somebody else try to allocate a slot.
532 */
533 LWLockRelease(ReplicationSlotAllocationLock);
534
535 /* Let everybody know we've modified this slot */
537}
538
539/*
540 * Search for the named replication slot.
541 *
542 * Return the replication slot if found, otherwise NULL.
543 */
545SearchNamedReplicationSlot(const char *name, bool need_lock)
546{
547 int i;
548 ReplicationSlot *slot = NULL;
549
550 if (need_lock)
551 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
552
553 for (i = 0; i < max_replication_slots; i++)
554 {
556
557 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
558 {
559 slot = s;
560 break;
561 }
562 }
563
564 if (need_lock)
565 LWLockRelease(ReplicationSlotControlLock);
566
567 return slot;
568}
569
570/*
571 * Return the index of the replication slot in
572 * ReplicationSlotCtl->replication_slots.
573 *
574 * This is mainly useful to have an efficient key for storing replication slot
575 * stats.
576 */
577int
579{
581 slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
582
584}
585
586/*
587 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
588 * the slot's name and true is returned.
589 *
590 * This likely is only useful for pgstat_replslot.c during shutdown, in other
591 * cases there are obvious TOCTOU issues.
592 */
593bool
595{
596 ReplicationSlot *slot;
597 bool found;
598
600
601 /*
602 * Ensure that the slot cannot be dropped while we copy the name. Don't
603 * need the spinlock as the name of an existing slot cannot change.
604 */
605 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
606 found = slot->in_use;
607 if (slot->in_use)
609 LWLockRelease(ReplicationSlotControlLock);
610
611 return found;
612}
613
614/*
615 * Find a previously created slot and mark it as used by this process.
616 *
617 * An error is raised if nowait is true and the slot is currently in use. If
618 * nowait is false, we sleep until the slot is released by the owning process.
619 *
620 * An error is raised if error_if_invalid is true and the slot is found to
621 * be invalid. It should always be set to true, except when we are temporarily
622 * acquiring the slot and don't intend to change it.
623 */
624void
625ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
626{
628 int active_pid;
629
630 Assert(name != NULL);
631
632retry:
633 Assert(MyReplicationSlot == NULL);
634
635 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
636
637 /* Check if the slot exits with the given name. */
639 if (s == NULL || !s->in_use)
640 {
641 LWLockRelease(ReplicationSlotControlLock);
642
644 (errcode(ERRCODE_UNDEFINED_OBJECT),
645 errmsg("replication slot \"%s\" does not exist",
646 name)));
647 }
648
649 /*
650 * Do not allow users to acquire the reserved slot. This scenario may
651 * occur if the launcher that owns the slot has terminated unexpectedly
652 * due to an error, and a backend process attempts to reuse the slot.
653 */
656 errcode(ERRCODE_UNDEFINED_OBJECT),
657 errmsg("cannot acquire replication slot \"%s\"", name),
658 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
659
660 /*
661 * This is the slot we want; check if it's active under some other
662 * process. In single user mode, we don't need this check.
663 */
665 {
666 /*
667 * Get ready to sleep on the slot in case it is active. (We may end
668 * up not sleeping, but we don't want to do this while holding the
669 * spinlock.)
670 */
671 if (!nowait)
673
674 /*
675 * It is important to reset the inactive_since under spinlock here to
676 * avoid race conditions with slot invalidation. See comments related
677 * to inactive_since in InvalidatePossiblyObsoleteSlot.
678 */
680 if (s->active_pid == 0)
682 active_pid = s->active_pid;
685 }
686 else
687 {
688 s->active_pid = active_pid = MyProcPid;
690 }
691 LWLockRelease(ReplicationSlotControlLock);
692
693 /*
694 * If we found the slot but it's already active in another process, we
695 * wait until the owning process signals us that it's been released, or
696 * error out.
697 */
698 if (active_pid != MyProcPid)
699 {
700 if (!nowait)
701 {
702 /* Wait here until we get signaled, and then restart */
704 WAIT_EVENT_REPLICATION_SLOT_DROP);
706 goto retry;
707 }
708
710 (errcode(ERRCODE_OBJECT_IN_USE),
711 errmsg("replication slot \"%s\" is active for PID %d",
712 NameStr(s->data.name), active_pid)));
713 }
714 else if (!nowait)
715 ConditionVariableCancelSleep(); /* no sleep needed after all */
716
717 /* We made this slot active, so it's ours now. */
719
720 /*
721 * We need to check for invalidation after making the slot ours to avoid
722 * the possible race condition with the checkpointer that can otherwise
723 * invalidate the slot immediately after the check.
724 */
725 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
727 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
728 errmsg("can no longer access replication slot \"%s\"",
729 NameStr(s->data.name)),
730 errdetail("This replication slot has been invalidated due to \"%s\".",
732
733 /* Let everybody know we've modified this slot */
735
736 /*
737 * The call to pgstat_acquire_replslot() protects against stats for a
738 * different slot, from before a restart or such, being present during
739 * pgstat_report_replslot().
740 */
741 if (SlotIsLogical(s))
743
744
745 if (am_walsender)
746 {
749 ? errmsg("acquired logical replication slot \"%s\"",
750 NameStr(s->data.name))
751 : errmsg("acquired physical replication slot \"%s\"",
752 NameStr(s->data.name)));
753 }
754}
755
756/*
757 * Release the replication slot that this backend considers to own.
758 *
759 * This or another backend can re-acquire the slot later.
760 * Resources this slot requires will be preserved.
761 */
762void
764{
766 char *slotname = NULL; /* keep compiler quiet */
767 bool is_logical = false; /* keep compiler quiet */
768 TimestampTz now = 0;
769
770 Assert(slot != NULL && slot->active_pid != 0);
771
772 if (am_walsender)
773 {
774 slotname = pstrdup(NameStr(slot->data.name));
775 is_logical = SlotIsLogical(slot);
776 }
777
778 if (slot->data.persistency == RS_EPHEMERAL)
779 {
780 /*
781 * Delete the slot. There is no !PANIC case where this is allowed to
782 * fail, all that may happen is an incomplete cleanup of the on-disk
783 * data.
784 */
786 }
787
788 /*
789 * If slot needed to temporarily restrain both data and catalog xmin to
790 * create the catalog snapshot, remove that temporary constraint.
791 * Snapshots can only be exported while the initial snapshot is still
792 * acquired.
793 */
794 if (!TransactionIdIsValid(slot->data.xmin) &&
796 {
797 SpinLockAcquire(&slot->mutex);
799 SpinLockRelease(&slot->mutex);
801 }
802
803 /*
804 * Set the time since the slot has become inactive. We get the current
805 * time beforehand to avoid system call while holding the spinlock.
806 */
808
809 if (slot->data.persistency == RS_PERSISTENT)
810 {
811 /*
812 * Mark persistent slot inactive. We're not freeing it, just
813 * disconnecting, but wake up others that may be waiting for it.
814 */
815 SpinLockAcquire(&slot->mutex);
816 slot->active_pid = 0;
818 SpinLockRelease(&slot->mutex);
820 }
821 else
823
824 MyReplicationSlot = NULL;
825
826 /* might not have been set when we've been a plain slot */
827 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
828 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
830 LWLockRelease(ProcArrayLock);
831
832 if (am_walsender)
833 {
835 is_logical
836 ? errmsg("released logical replication slot \"%s\"",
837 slotname)
838 : errmsg("released physical replication slot \"%s\"",
839 slotname));
840
841 pfree(slotname);
842 }
843}
844
845/*
846 * Cleanup temporary slots created in current session.
847 *
848 * Cleanup only synced temporary slots if 'synced_only' is true, else
849 * cleanup all temporary slots.
850 */
851void
852ReplicationSlotCleanup(bool synced_only)
853{
854 int i;
855
856 Assert(MyReplicationSlot == NULL);
857
858restart:
859 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
860 for (i = 0; i < max_replication_slots; i++)
861 {
863
864 if (!s->in_use)
865 continue;
866
868 if ((s->active_pid == MyProcPid &&
869 (!synced_only || s->data.synced)))
870 {
873 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
874
876
878 goto restart;
879 }
880 else
882 }
883
884 LWLockRelease(ReplicationSlotControlLock);
885}
886
887/*
888 * Permanently drop replication slot identified by the passed in name.
889 */
890void
891ReplicationSlotDrop(const char *name, bool nowait)
892{
893 Assert(MyReplicationSlot == NULL);
894
895 ReplicationSlotAcquire(name, nowait, false);
896
897 /*
898 * Do not allow users to drop the slots which are currently being synced
899 * from the primary to the standby.
900 */
903 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 errmsg("cannot drop replication slot \"%s\"", name),
905 errdetail("This replication slot is being synchronized from the primary server."));
906
908}
909
910/*
911 * Change the definition of the slot identified by the specified name.
912 */
913void
914ReplicationSlotAlter(const char *name, const bool *failover,
915 const bool *two_phase)
916{
917 bool update_slot = false;
918
919 Assert(MyReplicationSlot == NULL);
921
922 ReplicationSlotAcquire(name, false, true);
923
926 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
927 errmsg("cannot use %s with a physical replication slot",
928 "ALTER_REPLICATION_SLOT"));
929
930 if (RecoveryInProgress())
931 {
932 /*
933 * Do not allow users to alter the slots which are currently being
934 * synced from the primary to the standby.
935 */
938 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
939 errmsg("cannot alter replication slot \"%s\"", name),
940 errdetail("This replication slot is being synchronized from the primary server."));
941
942 /*
943 * Do not allow users to enable failover on the standby as we do not
944 * support sync to the cascading standby.
945 */
946 if (failover && *failover)
948 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
949 errmsg("cannot enable failover for a replication slot"
950 " on the standby"));
951 }
952
953 if (failover)
954 {
955 /*
956 * Do not allow users to enable failover for temporary slots as we do
957 * not support syncing temporary slots to the standby.
958 */
961 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 errmsg("cannot enable failover for a temporary replication slot"));
963
965 {
969
970 update_slot = true;
971 }
972 }
973
975 {
979
980 update_slot = true;
981 }
982
983 if (update_slot)
984 {
987 }
988
990}
991
992/*
993 * Permanently drop the currently acquired replication slot.
994 */
995void
997{
999
1000 Assert(MyReplicationSlot != NULL);
1001
1002 /* slot isn't acquired anymore */
1003 MyReplicationSlot = NULL;
1004
1006}
1007
1008/*
1009 * Permanently drop the replication slot which will be released by the point
1010 * this function returns.
1011 */
1012static void
1014{
1015 char path[MAXPGPATH];
1016 char tmppath[MAXPGPATH];
1017
1018 /*
1019 * If some other backend ran this code concurrently with us, we might try
1020 * to delete a slot with a certain name while someone else was trying to
1021 * create a slot with the same name.
1022 */
1023 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1024
1025 /* Generate pathnames. */
1026 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1027 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1028
1029 /*
1030 * Rename the slot directory on disk, so that we'll no longer recognize
1031 * this as a valid slot. Note that if this fails, we've got to mark the
1032 * slot inactive before bailing out. If we're dropping an ephemeral or a
1033 * temporary slot, we better never fail hard as the caller won't expect
1034 * the slot to survive and this might get called during error handling.
1035 */
1036 if (rename(path, tmppath) == 0)
1037 {
1038 /*
1039 * We need to fsync() the directory we just renamed and its parent to
1040 * make sure that our changes are on disk in a crash-safe fashion. If
1041 * fsync() fails, we can't be sure whether the changes are on disk or
1042 * not. For now, we handle that by panicking;
1043 * StartupReplicationSlots() will try to straighten it out after
1044 * restart.
1045 */
1047 fsync_fname(tmppath, true);
1050 }
1051 else
1052 {
1053 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1054
1055 SpinLockAcquire(&slot->mutex);
1056 slot->active_pid = 0;
1057 SpinLockRelease(&slot->mutex);
1058
1059 /* wake up anyone waiting on this slot */
1061
1062 ereport(fail_softly ? WARNING : ERROR,
1064 errmsg("could not rename file \"%s\" to \"%s\": %m",
1065 path, tmppath)));
1066 }
1067
1068 /*
1069 * The slot is definitely gone. Lock out concurrent scans of the array
1070 * long enough to kill it. It's OK to clear the active PID here without
1071 * grabbing the mutex because nobody else can be scanning the array here,
1072 * and nobody can be attached to this slot and thus access it without
1073 * scanning the array.
1074 *
1075 * Also wake up processes waiting for it.
1076 */
1077 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1078 slot->active_pid = 0;
1079 slot->in_use = false;
1080 LWLockRelease(ReplicationSlotControlLock);
1082
1083 /*
1084 * Slot is dead and doesn't prevent resource removal anymore, recompute
1085 * limits.
1086 */
1089
1090 /*
1091 * If removing the directory fails, the worst thing that will happen is
1092 * that the user won't be able to create a new slot with the same name
1093 * until the next server restart. We warn about it, but that's all.
1094 */
1095 if (!rmtree(tmppath, true))
1097 (errmsg("could not remove directory \"%s\"", tmppath)));
1098
1099 /*
1100 * Drop the statistics entry for the replication slot. Do this while
1101 * holding ReplicationSlotAllocationLock so that we don't drop a
1102 * statistics entry for another slot with the same name just created in
1103 * another session.
1104 */
1105 if (SlotIsLogical(slot))
1107
1108 /*
1109 * We release this at the very end, so that nobody starts trying to create
1110 * a slot while we're still cleaning up the detritus of the old one.
1111 */
1112 LWLockRelease(ReplicationSlotAllocationLock);
1113}
1114
1115/*
1116 * Serialize the currently acquired slot's state from memory to disk, thereby
1117 * guaranteeing the current state will survive a crash.
1118 */
1119void
1121{
1122 char path[MAXPGPATH];
1123
1124 Assert(MyReplicationSlot != NULL);
1125
1128}
1129
1130/*
1131 * Signal that it would be useful if the currently acquired slot would be
1132 * flushed out to disk.
1133 *
1134 * Note that the actual flush to disk can be delayed for a long time, if
1135 * required for correctness explicitly do a ReplicationSlotSave().
1136 */
1137void
1139{
1141
1142 Assert(MyReplicationSlot != NULL);
1143
1144 SpinLockAcquire(&slot->mutex);
1146 MyReplicationSlot->dirty = true;
1147 SpinLockRelease(&slot->mutex);
1148}
1149
1150/*
1151 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1152 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1153 */
1154void
1156{
1158
1159 Assert(slot != NULL);
1161
1162 SpinLockAcquire(&slot->mutex);
1164 SpinLockRelease(&slot->mutex);
1165
1168}
1169
1170/*
1171 * Compute the oldest xmin across all slots and store it in the ProcArray.
1172 *
1173 * If already_locked is true, ProcArrayLock has already been acquired
1174 * exclusively.
1175 */
1176void
1178{
1179 int i;
1181 TransactionId agg_catalog_xmin = InvalidTransactionId;
1182
1183 Assert(ReplicationSlotCtl != NULL);
1184
1185 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1186
1187 for (i = 0; i < max_replication_slots; i++)
1188 {
1190 TransactionId effective_xmin;
1191 TransactionId effective_catalog_xmin;
1192 bool invalidated;
1193
1194 if (!s->in_use)
1195 continue;
1196
1198 effective_xmin = s->effective_xmin;
1199 effective_catalog_xmin = s->effective_catalog_xmin;
1200 invalidated = s->data.invalidated != RS_INVAL_NONE;
1202
1203 /* invalidated slots need not apply */
1204 if (invalidated)
1205 continue;
1206
1207 /* check the data xmin */
1208 if (TransactionIdIsValid(effective_xmin) &&
1209 (!TransactionIdIsValid(agg_xmin) ||
1210 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1211 agg_xmin = effective_xmin;
1212
1213 /* check the catalog xmin */
1214 if (TransactionIdIsValid(effective_catalog_xmin) &&
1215 (!TransactionIdIsValid(agg_catalog_xmin) ||
1216 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1217 agg_catalog_xmin = effective_catalog_xmin;
1218 }
1219
1220 LWLockRelease(ReplicationSlotControlLock);
1221
1222 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1223}
1224
1225/*
1226 * Compute the oldest restart LSN across all slots and inform xlog module.
1227 *
1228 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1229 * purpose, we don't try to account for that, because this module doesn't
1230 * know what to compare against.
1231 */
1232void
1234{
1235 int i;
1236 XLogRecPtr min_required = InvalidXLogRecPtr;
1237
1238 Assert(ReplicationSlotCtl != NULL);
1239
1240 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1241 for (i = 0; i < max_replication_slots; i++)
1242 {
1244 XLogRecPtr restart_lsn;
1245 XLogRecPtr last_saved_restart_lsn;
1246 bool invalidated;
1247 ReplicationSlotPersistency persistency;
1248
1249 if (!s->in_use)
1250 continue;
1251
1253 persistency = s->data.persistency;
1254 restart_lsn = s->data.restart_lsn;
1255 invalidated = s->data.invalidated != RS_INVAL_NONE;
1256 last_saved_restart_lsn = s->last_saved_restart_lsn;
1258
1259 /* invalidated slots need not apply */
1260 if (invalidated)
1261 continue;
1262
1263 /*
1264 * For persistent slot use last_saved_restart_lsn to compute the
1265 * oldest LSN for removal of WAL segments. The segments between
1266 * last_saved_restart_lsn and restart_lsn might be needed by a
1267 * persistent slot in the case of database crash. Non-persistent
1268 * slots can't survive the database crash, so we don't care about
1269 * last_saved_restart_lsn for them.
1270 */
1271 if (persistency == RS_PERSISTENT)
1272 {
1273 if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1274 restart_lsn > last_saved_restart_lsn)
1275 {
1276 restart_lsn = last_saved_restart_lsn;
1277 }
1278 }
1279
1280 if (restart_lsn != InvalidXLogRecPtr &&
1281 (min_required == InvalidXLogRecPtr ||
1282 restart_lsn < min_required))
1283 min_required = restart_lsn;
1284 }
1285 LWLockRelease(ReplicationSlotControlLock);
1286
1288}
1289
1290/*
1291 * Compute the oldest WAL LSN required by *logical* decoding slots..
1292 *
1293 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1294 * slots exist.
1295 *
1296 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1297 * ignores physical replication slots.
1298 *
1299 * The results aren't required frequently, so we don't maintain a precomputed
1300 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1301 */
1304{
1306 int i;
1307
1308 if (max_replication_slots <= 0)
1309 return InvalidXLogRecPtr;
1310
1311 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1312
1313 for (i = 0; i < max_replication_slots; i++)
1314 {
1315 ReplicationSlot *s;
1316 XLogRecPtr restart_lsn;
1317 XLogRecPtr last_saved_restart_lsn;
1318 bool invalidated;
1319 ReplicationSlotPersistency persistency;
1320
1322
1323 /* cannot change while ReplicationSlotCtlLock is held */
1324 if (!s->in_use)
1325 continue;
1326
1327 /* we're only interested in logical slots */
1328 if (!SlotIsLogical(s))
1329 continue;
1330
1331 /* read once, it's ok if it increases while we're checking */
1333 persistency = s->data.persistency;
1334 restart_lsn = s->data.restart_lsn;
1335 invalidated = s->data.invalidated != RS_INVAL_NONE;
1336 last_saved_restart_lsn = s->last_saved_restart_lsn;
1338
1339 /* invalidated slots need not apply */
1340 if (invalidated)
1341 continue;
1342
1343 /*
1344 * For persistent slot use last_saved_restart_lsn to compute the
1345 * oldest LSN for removal of WAL segments. The segments between
1346 * last_saved_restart_lsn and restart_lsn might be needed by a
1347 * persistent slot in the case of database crash. Non-persistent
1348 * slots can't survive the database crash, so we don't care about
1349 * last_saved_restart_lsn for them.
1350 */
1351 if (persistency == RS_PERSISTENT)
1352 {
1353 if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1354 restart_lsn > last_saved_restart_lsn)
1355 {
1356 restart_lsn = last_saved_restart_lsn;
1357 }
1358 }
1359
1360 if (restart_lsn == InvalidXLogRecPtr)
1361 continue;
1362
1363 if (result == InvalidXLogRecPtr ||
1364 restart_lsn < result)
1365 result = restart_lsn;
1366 }
1367
1368 LWLockRelease(ReplicationSlotControlLock);
1369
1370 return result;
1371}
1372
1373/*
1374 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1375 * passed database oid.
1376 *
1377 * Returns true if there are any slots referencing the database. *nslots will
1378 * be set to the absolute number of slots in the database, *nactive to ones
1379 * currently active.
1380 */
1381bool
1382ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1383{
1384 int i;
1385
1386 *nslots = *nactive = 0;
1387
1388 if (max_replication_slots <= 0)
1389 return false;
1390
1391 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1392 for (i = 0; i < max_replication_slots; i++)
1393 {
1394 ReplicationSlot *s;
1395
1397
1398 /* cannot change while ReplicationSlotCtlLock is held */
1399 if (!s->in_use)
1400 continue;
1401
1402 /* only logical slots are database specific, skip */
1403 if (!SlotIsLogical(s))
1404 continue;
1405
1406 /* not our database, skip */
1407 if (s->data.database != dboid)
1408 continue;
1409
1410 /* NB: intentionally counting invalidated slots */
1411
1412 /* count slots with spinlock held */
1414 (*nslots)++;
1415 if (s->active_pid != 0)
1416 (*nactive)++;
1418 }
1419 LWLockRelease(ReplicationSlotControlLock);
1420
1421 if (*nslots > 0)
1422 return true;
1423 return false;
1424}
1425
1426/*
1427 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1428 * passed database oid. The caller should hold an exclusive lock on the
1429 * pg_database oid for the database to prevent creation of new slots on the db
1430 * or replay from existing slots.
1431 *
1432 * Another session that concurrently acquires an existing slot on the target DB
1433 * (most likely to drop it) may cause this function to ERROR. If that happens
1434 * it may have dropped some but not all slots.
1435 *
1436 * This routine isn't as efficient as it could be - but we don't drop
1437 * databases often, especially databases with lots of slots.
1438 */
1439void
1441{
1442 int i;
1443
1444 if (max_replication_slots <= 0)
1445 return;
1446
1447restart:
1448 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1449 for (i = 0; i < max_replication_slots; i++)
1450 {
1451 ReplicationSlot *s;
1452 char *slotname;
1453 int active_pid;
1454
1456
1457 /* cannot change while ReplicationSlotCtlLock is held */
1458 if (!s->in_use)
1459 continue;
1460
1461 /* only logical slots are database specific, skip */
1462 if (!SlotIsLogical(s))
1463 continue;
1464
1465 /* not our database, skip */
1466 if (s->data.database != dboid)
1467 continue;
1468
1469 /* NB: intentionally including invalidated slots */
1470
1471 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1473 /* can't change while ReplicationSlotControlLock is held */
1474 slotname = NameStr(s->data.name);
1475 active_pid = s->active_pid;
1476 if (active_pid == 0)
1477 {
1479 s->active_pid = MyProcPid;
1480 }
1482
1483 /*
1484 * Even though we hold an exclusive lock on the database object a
1485 * logical slot for that DB can still be active, e.g. if it's
1486 * concurrently being dropped by a backend connected to another DB.
1487 *
1488 * That's fairly unlikely in practice, so we'll just bail out.
1489 *
1490 * The slot sync worker holds a shared lock on the database before
1491 * operating on synced logical slots to avoid conflict with the drop
1492 * happening here. The persistent synced slots are thus safe but there
1493 * is a possibility that the slot sync worker has created a temporary
1494 * slot (which stays active even on release) and we are trying to drop
1495 * that here. In practice, the chances of hitting this scenario are
1496 * less as during slot synchronization, the temporary slot is
1497 * immediately converted to persistent and thus is safe due to the
1498 * shared lock taken on the database. So, we'll just bail out in such
1499 * a case.
1500 *
1501 * XXX: We can consider shutting down the slot sync worker before
1502 * trying to drop synced temporary slots here.
1503 */
1504 if (active_pid)
1505 ereport(ERROR,
1506 (errcode(ERRCODE_OBJECT_IN_USE),
1507 errmsg("replication slot \"%s\" is active for PID %d",
1508 slotname, active_pid)));
1509
1510 /*
1511 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1512 * holding ReplicationSlotControlLock over filesystem operations,
1513 * release ReplicationSlotControlLock and use
1514 * ReplicationSlotDropAcquired.
1515 *
1516 * As that means the set of slots could change, restart scan from the
1517 * beginning each time we release the lock.
1518 */
1519 LWLockRelease(ReplicationSlotControlLock);
1521 goto restart;
1522 }
1523 LWLockRelease(ReplicationSlotControlLock);
1524}
1525
1526
1527/*
1528 * Check whether the server's configuration supports using replication
1529 * slots.
1530 */
1531void
1533{
1534 /*
1535 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1536 * needs the same check.
1537 */
1538
1539 if (max_replication_slots == 0)
1540 ereport(ERROR,
1541 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1542 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1543
1545 ereport(ERROR,
1546 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1547 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1548}
1549
1550/*
1551 * Check whether the user has privilege to use replication slots.
1552 */
1553void
1555{
1557 ereport(ERROR,
1558 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1559 errmsg("permission denied to use replication slots"),
1560 errdetail("Only roles with the %s attribute may use replication slots.",
1561 "REPLICATION")));
1562}
1563
1564/*
1565 * Reserve WAL for the currently active slot.
1566 *
1567 * Compute and set restart_lsn in a manner that's appropriate for the type of
1568 * the slot and concurrency safe.
1569 */
1570void
1572{
1574
1575 Assert(slot != NULL);
1578
1579 /*
1580 * The replication slot mechanism is used to prevent removal of required
1581 * WAL. As there is no interlock between this routine and checkpoints, WAL
1582 * segments could concurrently be removed when a now stale return value of
1583 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1584 * this happens we'll just retry.
1585 */
1586 while (true)
1587 {
1588 XLogSegNo segno;
1589 XLogRecPtr restart_lsn;
1590
1591 /*
1592 * For logical slots log a standby snapshot and start logical decoding
1593 * at exactly that position. That allows the slot to start up more
1594 * quickly. But on a standby we cannot do WAL writes, so just use the
1595 * replay pointer; effectively, an attempt to create a logical slot on
1596 * standby will cause it to wait for an xl_running_xact record to be
1597 * logged independently on the primary, so that a snapshot can be
1598 * built using the record.
1599 *
1600 * None of this is needed (or indeed helpful) for physical slots as
1601 * they'll start replay at the last logged checkpoint anyway. Instead
1602 * return the location of the last redo LSN. While that slightly
1603 * increases the chance that we have to retry, it's where a base
1604 * backup has to start replay at.
1605 */
1606 if (SlotIsPhysical(slot))
1607 restart_lsn = GetRedoRecPtr();
1608 else if (RecoveryInProgress())
1609 restart_lsn = GetXLogReplayRecPtr(NULL);
1610 else
1611 restart_lsn = GetXLogInsertRecPtr();
1612
1613 SpinLockAcquire(&slot->mutex);
1614 slot->data.restart_lsn = restart_lsn;
1615 SpinLockRelease(&slot->mutex);
1616
1617 /* prevent WAL removal as fast as possible */
1619
1620 /*
1621 * If all required WAL is still there, great, otherwise retry. The
1622 * slot should prevent further removal of WAL, unless there's a
1623 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1624 * the new restart_lsn above, so normally we should never need to loop
1625 * more than twice.
1626 */
1628 if (XLogGetLastRemovedSegno() < segno)
1629 break;
1630 }
1631
1632 if (!RecoveryInProgress() && SlotIsLogical(slot))
1633 {
1634 XLogRecPtr flushptr;
1635
1636 /* make sure we have enough information to start */
1637 flushptr = LogStandbySnapshot();
1638
1639 /* and make sure it's fsynced to disk */
1640 XLogFlush(flushptr);
1641 }
1642}
1643
1644/*
1645 * Report that replication slot needs to be invalidated
1646 */
1647static void
1649 bool terminating,
1650 int pid,
1651 NameData slotname,
1652 XLogRecPtr restart_lsn,
1653 XLogRecPtr oldestLSN,
1654 TransactionId snapshotConflictHorizon,
1655 long slot_idle_seconds)
1656{
1657 StringInfoData err_detail;
1658 StringInfoData err_hint;
1659
1660 initStringInfo(&err_detail);
1661 initStringInfo(&err_hint);
1662
1663 switch (cause)
1664 {
1666 {
1667 uint64 ex = oldestLSN - restart_lsn;
1668
1669 appendStringInfo(&err_detail,
1670 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1671 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1672 ex),
1673 LSN_FORMAT_ARGS(restart_lsn),
1674 ex);
1675 /* translator: %s is a GUC variable name */
1676 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1677 "max_slot_wal_keep_size");
1678 break;
1679 }
1680 case RS_INVAL_HORIZON:
1681 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1682 snapshotConflictHorizon);
1683 break;
1684
1685 case RS_INVAL_WAL_LEVEL:
1686 appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1687 break;
1688
1690 {
1691 /* translator: %s is a GUC variable name */
1692 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1693 slot_idle_seconds, "idle_replication_slot_timeout",
1695 /* translator: %s is a GUC variable name */
1696 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1697 "idle_replication_slot_timeout");
1698 break;
1699 }
1700 case RS_INVAL_NONE:
1702 }
1703
1704 ereport(LOG,
1705 terminating ?
1706 errmsg("terminating process %d to release replication slot \"%s\"",
1707 pid, NameStr(slotname)) :
1708 errmsg("invalidating obsolete replication slot \"%s\"",
1709 NameStr(slotname)),
1710 errdetail_internal("%s", err_detail.data),
1711 err_hint.len ? errhint("%s", err_hint.data) : 0);
1712
1713 pfree(err_detail.data);
1714 pfree(err_hint.data);
1715}
1716
1717/*
1718 * Can we invalidate an idle replication slot?
1719 *
1720 * Idle timeout invalidation is allowed only when:
1721 *
1722 * 1. Idle timeout is set
1723 * 2. Slot has reserved WAL
1724 * 3. Slot is inactive
1725 * 4. The slot is not being synced from the primary while the server is in
1726 * recovery. This is because synced slots are always considered to be
1727 * inactive because they don't perform logical decoding to produce changes.
1728 */
1729static inline bool
1731{
1734 s->inactive_since > 0 &&
1735 !(RecoveryInProgress() && s->data.synced));
1736}
1737
1738/*
1739 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1740 * becomes invalid among the given possible causes.
1741 *
1742 * This function sequentially checks all possible invalidation causes and
1743 * returns the first one for which the slot is eligible for invalidation.
1744 */
1747 XLogRecPtr oldestLSN, Oid dboid,
1748 TransactionId snapshotConflictHorizon,
1749 TimestampTz *inactive_since, TimestampTz now)
1750{
1751 Assert(possible_causes != RS_INVAL_NONE);
1752
1753 if (possible_causes & RS_INVAL_WAL_REMOVED)
1754 {
1755 XLogRecPtr restart_lsn = s->data.restart_lsn;
1756
1757 if (restart_lsn != InvalidXLogRecPtr &&
1758 restart_lsn < oldestLSN)
1759 return RS_INVAL_WAL_REMOVED;
1760 }
1761
1762 if (possible_causes & RS_INVAL_HORIZON)
1763 {
1764 /* invalid DB oid signals a shared relation */
1765 if (SlotIsLogical(s) &&
1766 (dboid == InvalidOid || dboid == s->data.database))
1767 {
1768 TransactionId effective_xmin = s->effective_xmin;
1769 TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1770
1771 if (TransactionIdIsValid(effective_xmin) &&
1772 TransactionIdPrecedesOrEquals(effective_xmin,
1773 snapshotConflictHorizon))
1774 return RS_INVAL_HORIZON;
1775 else if (TransactionIdIsValid(catalog_effective_xmin) &&
1776 TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1777 snapshotConflictHorizon))
1778 return RS_INVAL_HORIZON;
1779 }
1780 }
1781
1782 if (possible_causes & RS_INVAL_WAL_LEVEL)
1783 {
1784 if (SlotIsLogical(s))
1785 return RS_INVAL_WAL_LEVEL;
1786 }
1787
1788 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1789 {
1790 Assert(now > 0);
1791
1792 if (CanInvalidateIdleSlot(s))
1793 {
1794 /*
1795 * Simulate the invalidation due to idle_timeout to test the
1796 * timeout behavior promptly, without waiting for it to trigger
1797 * naturally.
1798 */
1799#ifdef USE_INJECTION_POINTS
1800 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1801 {
1802 *inactive_since = 0; /* since the beginning of time */
1803 return RS_INVAL_IDLE_TIMEOUT;
1804 }
1805#endif
1806
1807 /*
1808 * Check if the slot needs to be invalidated due to
1809 * idle_replication_slot_timeout GUC.
1810 */
1813 {
1814 *inactive_since = s->inactive_since;
1815 return RS_INVAL_IDLE_TIMEOUT;
1816 }
1817 }
1818 }
1819
1820 return RS_INVAL_NONE;
1821}
1822
1823/*
1824 * Helper for InvalidateObsoleteReplicationSlots
1825 *
1826 * Acquires the given slot and mark it invalid, if necessary and possible.
1827 *
1828 * Returns whether ReplicationSlotControlLock was released in the interim (and
1829 * in that case we're not holding the lock at return, otherwise we are).
1830 *
1831 * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1832 *
1833 * This is inherently racy, because we release the LWLock
1834 * for syscalls, so caller must restart if we return true.
1835 */
1836static bool
1838 ReplicationSlot *s,
1839 XLogRecPtr oldestLSN,
1840 Oid dboid, TransactionId snapshotConflictHorizon,
1841 bool *invalidated)
1842{
1843 int last_signaled_pid = 0;
1844 bool released_lock = false;
1845 TimestampTz inactive_since = 0;
1846
1847 for (;;)
1848 {
1849 XLogRecPtr restart_lsn;
1850 NameData slotname;
1851 int active_pid = 0;
1853 TimestampTz now = 0;
1854 long slot_idle_secs = 0;
1855
1856 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1857
1858 if (!s->in_use)
1859 {
1860 if (released_lock)
1861 LWLockRelease(ReplicationSlotControlLock);
1862 break;
1863 }
1864
1865 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1866 {
1867 /*
1868 * Assign the current time here to avoid system call overhead
1869 * while holding the spinlock in subsequent code.
1870 */
1872 }
1873
1874 /*
1875 * Check if the slot needs to be invalidated. If it needs to be
1876 * invalidated, and is not currently acquired, acquire it and mark it
1877 * as having been invalidated. We do this with the spinlock held to
1878 * avoid race conditions -- for example the restart_lsn could move
1879 * forward, or the slot could be dropped.
1880 */
1882
1883 restart_lsn = s->data.restart_lsn;
1884
1885 /* we do nothing if the slot is already invalid */
1886 if (s->data.invalidated == RS_INVAL_NONE)
1887 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1888 s, oldestLSN,
1889 dboid,
1890 snapshotConflictHorizon,
1891 &inactive_since,
1892 now);
1893
1894 /* if there's no invalidation, we're done */
1895 if (invalidation_cause == RS_INVAL_NONE)
1896 {
1898 if (released_lock)
1899 LWLockRelease(ReplicationSlotControlLock);
1900 break;
1901 }
1902
1903 slotname = s->data.name;
1904 active_pid = s->active_pid;
1905
1906 /*
1907 * If the slot can be acquired, do so and mark it invalidated
1908 * immediately. Otherwise we'll signal the owning process, below, and
1909 * retry.
1910 *
1911 * Note: Unlike other slot attributes, slot's inactive_since can't be
1912 * changed until the acquired slot is released or the owning process
1913 * is terminated. So, the inactive slot can only be invalidated
1914 * immediately without being terminated.
1915 */
1916 if (active_pid == 0)
1917 {
1919 s->active_pid = MyProcPid;
1920 s->data.invalidated = invalidation_cause;
1921
1922 /*
1923 * XXX: We should consider not overwriting restart_lsn and instead
1924 * just rely on .invalidated.
1925 */
1926 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1927 {
1930 }
1931
1932 /* Let caller know */
1933 *invalidated = true;
1934 }
1935
1937
1938 /*
1939 * Calculate the idle time duration of the slot if slot is marked
1940 * invalidated with RS_INVAL_IDLE_TIMEOUT.
1941 */
1942 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1943 {
1944 int slot_idle_usecs;
1945
1946 TimestampDifference(inactive_since, now, &slot_idle_secs,
1947 &slot_idle_usecs);
1948 }
1949
1950 if (active_pid != 0)
1951 {
1952 /*
1953 * Prepare the sleep on the slot's condition variable before
1954 * releasing the lock, to close a possible race condition if the
1955 * slot is released before the sleep below.
1956 */
1958
1959 LWLockRelease(ReplicationSlotControlLock);
1960 released_lock = true;
1961
1962 /*
1963 * Signal to terminate the process that owns the slot, if we
1964 * haven't already signalled it. (Avoidance of repeated
1965 * signalling is the only reason for there to be a loop in this
1966 * routine; otherwise we could rely on caller's restart loop.)
1967 *
1968 * There is the race condition that other process may own the slot
1969 * after its current owner process is terminated and before this
1970 * process owns it. To handle that, we signal only if the PID of
1971 * the owning process has changed from the previous time. (This
1972 * logic assumes that the same PID is not reused very quickly.)
1973 */
1974 if (last_signaled_pid != active_pid)
1975 {
1976 ReportSlotInvalidation(invalidation_cause, true, active_pid,
1977 slotname, restart_lsn,
1978 oldestLSN, snapshotConflictHorizon,
1979 slot_idle_secs);
1980
1981 if (MyBackendType == B_STARTUP)
1982 (void) SendProcSignal(active_pid,
1985 else
1986 (void) kill(active_pid, SIGTERM);
1987
1988 last_signaled_pid = active_pid;
1989 }
1990
1991 /* Wait until the slot is released. */
1993 WAIT_EVENT_REPLICATION_SLOT_DROP);
1994
1995 /*
1996 * Re-acquire lock and start over; we expect to invalidate the
1997 * slot next time (unless another process acquires the slot in the
1998 * meantime).
1999 *
2000 * Note: It is possible for a slot to advance its restart_lsn or
2001 * xmin values sufficiently between when we release the mutex and
2002 * when we recheck, moving from a conflicting state to a non
2003 * conflicting state. This is intentional and safe: if the slot
2004 * has caught up while we're busy here, the resources we were
2005 * concerned about (WAL segments or tuples) have not yet been
2006 * removed, and there's no reason to invalidate the slot.
2007 */
2008 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2009 continue;
2010 }
2011 else
2012 {
2013 /*
2014 * We hold the slot now and have already invalidated it; flush it
2015 * to ensure that state persists.
2016 *
2017 * Don't want to hold ReplicationSlotControlLock across file
2018 * system operations, so release it now but be sure to tell caller
2019 * to restart from scratch.
2020 */
2021 LWLockRelease(ReplicationSlotControlLock);
2022 released_lock = true;
2023
2024 /* Make sure the invalidated state persists across server restart */
2028
2029 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2030 slotname, restart_lsn,
2031 oldestLSN, snapshotConflictHorizon,
2032 slot_idle_secs);
2033
2034 /* done with this slot for now */
2035 break;
2036 }
2037 }
2038
2039 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2040
2041 return released_lock;
2042}
2043
2044/*
2045 * Invalidate slots that require resources about to be removed.
2046 *
2047 * Returns true when any slot have got invalidated.
2048 *
2049 * Whether a slot needs to be invalidated depends on the invalidation cause.
2050 * A slot is invalidated if it:
2051 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2052 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2053 * db; dboid may be InvalidOid for shared relations
2054 * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2055 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2056 * "idle_replication_slot_timeout" duration.
2057 *
2058 * Note: This function attempts to invalidate the slot for multiple possible
2059 * causes in a single pass, minimizing redundant iterations. The "cause"
2060 * parameter can be a MASK representing one or more of the defined causes.
2061 *
2062 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2063 */
2064bool
2066 XLogSegNo oldestSegno, Oid dboid,
2067 TransactionId snapshotConflictHorizon)
2068{
2069 XLogRecPtr oldestLSN;
2070 bool invalidated = false;
2071
2072 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2073 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2074 Assert(possible_causes != RS_INVAL_NONE);
2075
2076 if (max_replication_slots == 0)
2077 return invalidated;
2078
2079 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2080
2081restart:
2082 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2083 for (int i = 0; i < max_replication_slots; i++)
2084 {
2086
2087 if (!s->in_use)
2088 continue;
2089
2090 /* Prevent invalidation of logical slots during binary upgrade */
2092 continue;
2093
2094 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2095 snapshotConflictHorizon,
2096 &invalidated))
2097 {
2098 /* if the lock was released, start from scratch */
2099 goto restart;
2100 }
2101 }
2102 LWLockRelease(ReplicationSlotControlLock);
2103
2104 /*
2105 * If any slots have been invalidated, recalculate the resource limits.
2106 */
2107 if (invalidated)
2108 {
2111 }
2112
2113 return invalidated;
2114}
2115
2116/*
2117 * Flush all replication slots to disk.
2118 *
2119 * It is convenient to flush dirty replication slots at the time of checkpoint.
2120 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2121 * for which the confirmed_flush LSN has been updated since the last time it
2122 * was saved and flush them.
2123 */
2124void
2126{
2127 int i;
2128 bool last_saved_restart_lsn_updated = false;
2129
2130 elog(DEBUG1, "performing replication slot checkpoint");
2131
2132 /*
2133 * Prevent any slot from being created/dropped while we're active. As we
2134 * explicitly do *not* want to block iterating over replication_slots or
2135 * acquiring a slot we cannot take the control lock - but that's OK,
2136 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2137 * enough to guarantee that nobody can change the in_use bits on us.
2138 */
2139 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2140
2141 for (i = 0; i < max_replication_slots; i++)
2142 {
2144 char path[MAXPGPATH];
2145
2146 if (!s->in_use)
2147 continue;
2148
2149 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2150 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2151
2152 /*
2153 * Slot's data is not flushed each time the confirmed_flush LSN is
2154 * updated as that could lead to frequent writes. However, we decide
2155 * to force a flush of all logical slot's data at the time of shutdown
2156 * if the confirmed_flush LSN is changed since we last flushed it to
2157 * disk. This helps in avoiding an unnecessary retreat of the
2158 * confirmed_flush LSN after restart.
2159 */
2160 if (is_shutdown && SlotIsLogical(s))
2161 {
2163
2164 if (s->data.invalidated == RS_INVAL_NONE &&
2166 {
2167 s->just_dirtied = true;
2168 s->dirty = true;
2169 }
2171 }
2172
2173 /*
2174 * Track if we're going to update slot's last_saved_restart_lsn. We
2175 * need this to know if we need to recompute the required LSN.
2176 */
2178 last_saved_restart_lsn_updated = true;
2179
2180 SaveSlotToPath(s, path, LOG);
2181 }
2182 LWLockRelease(ReplicationSlotAllocationLock);
2183
2184 /*
2185 * Recompute the required LSN if SaveSlotToPath() updated
2186 * last_saved_restart_lsn for any slot.
2187 */
2188 if (last_saved_restart_lsn_updated)
2190}
2191
2192/*
2193 * Load all replication slots from disk into memory at server startup. This
2194 * needs to be run before we start crash recovery.
2195 */
2196void
2198{
2199 DIR *replication_dir;
2200 struct dirent *replication_de;
2201
2202 elog(DEBUG1, "starting up replication slots");
2203
2204 /* restore all slots by iterating over all on-disk entries */
2205 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2206 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2207 {
2208 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2209 PGFileType de_type;
2210
2211 if (strcmp(replication_de->d_name, ".") == 0 ||
2212 strcmp(replication_de->d_name, "..") == 0)
2213 continue;
2214
2215 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2216 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2217
2218 /* we're only creating directories here, skip if it's not our's */
2219 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2220 continue;
2221
2222 /* we crashed while a slot was being setup or deleted, clean up */
2223 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2224 {
2225 if (!rmtree(path, true))
2226 {
2228 (errmsg("could not remove directory \"%s\"",
2229 path)));
2230 continue;
2231 }
2233 continue;
2234 }
2235
2236 /* looks like a slot in a normal state, restore */
2237 RestoreSlotFromDisk(replication_de->d_name);
2238 }
2239 FreeDir(replication_dir);
2240
2241 /* currently no slots exist, we're done. */
2242 if (max_replication_slots <= 0)
2243 return;
2244
2245 /* Now that we have recovered all the data, compute replication xmin */
2248}
2249
2250/* ----
2251 * Manipulation of on-disk state of replication slots
2252 *
2253 * NB: none of the routines below should take any notice whether a slot is the
2254 * current one or not, that's all handled a layer above.
2255 * ----
2256 */
2257static void
2259{
2260 char tmppath[MAXPGPATH];
2261 char path[MAXPGPATH];
2262 struct stat st;
2263
2264 /*
2265 * No need to take out the io_in_progress_lock, nobody else can see this
2266 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2267 * takes out the lock, if we'd take the lock here, we'd deadlock.
2268 */
2269
2270 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2271 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2272
2273 /*
2274 * It's just barely possible that some previous effort to create or drop a
2275 * slot with this name left a temp directory lying around. If that seems
2276 * to be the case, try to remove it. If the rmtree() fails, we'll error
2277 * out at the MakePGDirectory() below, so we don't bother checking
2278 * success.
2279 */
2280 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2281 rmtree(tmppath, true);
2282
2283 /* Create and fsync the temporary slot directory. */
2284 if (MakePGDirectory(tmppath) < 0)
2285 ereport(ERROR,
2287 errmsg("could not create directory \"%s\": %m",
2288 tmppath)));
2289 fsync_fname(tmppath, true);
2290
2291 /* Write the actual state file. */
2292 slot->dirty = true; /* signal that we really need to write */
2293 SaveSlotToPath(slot, tmppath, ERROR);
2294
2295 /* Rename the directory into place. */
2296 if (rename(tmppath, path) != 0)
2297 ereport(ERROR,
2299 errmsg("could not rename file \"%s\" to \"%s\": %m",
2300 tmppath, path)));
2301
2302 /*
2303 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2304 * would persist after an OS crash or not - so, force a restart. The
2305 * restart would try to fsync this again till it works.
2306 */
2308
2309 fsync_fname(path, true);
2311
2313}
2314
2315/*
2316 * Shared functionality between saving and creating a replication slot.
2317 */
2318static void
2319SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2320{
2321 char tmppath[MAXPGPATH];
2322 char path[MAXPGPATH];
2323 int fd;
2325 bool was_dirty;
2326
2327 /* first check whether there's something to write out */
2328 SpinLockAcquire(&slot->mutex);
2329 was_dirty = slot->dirty;
2330 slot->just_dirtied = false;
2331 SpinLockRelease(&slot->mutex);
2332
2333 /* and don't do anything if there's nothing to write */
2334 if (!was_dirty)
2335 return;
2336
2338
2339 /* silence valgrind :( */
2340 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2341
2342 sprintf(tmppath, "%s/state.tmp", dir);
2343 sprintf(path, "%s/state", dir);
2344
2345 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2346 if (fd < 0)
2347 {
2348 /*
2349 * If not an ERROR, then release the lock before returning. In case
2350 * of an ERROR, the error recovery path automatically releases the
2351 * lock, but no harm in explicitly releasing even in that case. Note
2352 * that LWLockRelease() could affect errno.
2353 */
2354 int save_errno = errno;
2355
2357 errno = save_errno;
2358 ereport(elevel,
2360 errmsg("could not create file \"%s\": %m",
2361 tmppath)));
2362 return;
2363 }
2364
2365 cp.magic = SLOT_MAGIC;
2367 cp.version = SLOT_VERSION;
2369
2370 SpinLockAcquire(&slot->mutex);
2371
2372 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2373
2374 SpinLockRelease(&slot->mutex);
2375
2379 FIN_CRC32C(cp.checksum);
2380
2381 errno = 0;
2382 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2383 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2384 {
2385 int save_errno = errno;
2386
2389 unlink(tmppath);
2391
2392 /* if write didn't set errno, assume problem is no disk space */
2393 errno = save_errno ? save_errno : ENOSPC;
2394 ereport(elevel,
2396 errmsg("could not write to file \"%s\": %m",
2397 tmppath)));
2398 return;
2399 }
2401
2402 /* fsync the temporary file */
2403 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2404 if (pg_fsync(fd) != 0)
2405 {
2406 int save_errno = errno;
2407
2410 unlink(tmppath);
2412
2413 errno = save_errno;
2414 ereport(elevel,
2416 errmsg("could not fsync file \"%s\": %m",
2417 tmppath)));
2418 return;
2419 }
2421
2422 if (CloseTransientFile(fd) != 0)
2423 {
2424 int save_errno = errno;
2425
2426 unlink(tmppath);
2428
2429 errno = save_errno;
2430 ereport(elevel,
2432 errmsg("could not close file \"%s\": %m",
2433 tmppath)));
2434 return;
2435 }
2436
2437 /* rename to permanent file, fsync file and directory */
2438 if (rename(tmppath, path) != 0)
2439 {
2440 int save_errno = errno;
2441
2442 unlink(tmppath);
2444
2445 errno = save_errno;
2446 ereport(elevel,
2448 errmsg("could not rename file \"%s\" to \"%s\": %m",
2449 tmppath, path)));
2450 return;
2451 }
2452
2453 /*
2454 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2455 */
2457
2458 fsync_fname(path, false);
2459 fsync_fname(dir, true);
2461
2463
2464 /*
2465 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2466 * already and remember the confirmed_flush LSN value.
2467 */
2468 SpinLockAcquire(&slot->mutex);
2469 if (!slot->just_dirtied)
2470 slot->dirty = false;
2473 SpinLockRelease(&slot->mutex);
2474
2476}
2477
2478/*
2479 * Load a single slot from disk into memory.
2480 */
2481static void
2483{
2485 int i;
2486 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2487 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2488 int fd;
2489 bool restored = false;
2490 int readBytes;
2491 pg_crc32c checksum;
2492 TimestampTz now = 0;
2493
2494 /* no need to lock here, no concurrent access allowed yet */
2495
2496 /* delete temp file if it exists */
2497 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2498 sprintf(path, "%s/state.tmp", slotdir);
2499 if (unlink(path) < 0 && errno != ENOENT)
2500 ereport(PANIC,
2502 errmsg("could not remove file \"%s\": %m", path)));
2503
2504 sprintf(path, "%s/state", slotdir);
2505
2506 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2507
2508 /* on some operating systems fsyncing a file requires O_RDWR */
2509 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2510
2511 /*
2512 * We do not need to handle this as we are rename()ing the directory into
2513 * place only after we fsync()ed the state file.
2514 */
2515 if (fd < 0)
2516 ereport(PANIC,
2518 errmsg("could not open file \"%s\": %m", path)));
2519
2520 /*
2521 * Sync state file before we're reading from it. We might have crashed
2522 * while it wasn't synced yet and we shouldn't continue on that basis.
2523 */
2524 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2525 if (pg_fsync(fd) != 0)
2526 ereport(PANIC,
2528 errmsg("could not fsync file \"%s\": %m",
2529 path)));
2531
2532 /* Also sync the parent directory */
2534 fsync_fname(slotdir, true);
2536
2537 /* read part of statefile that's guaranteed to be version independent */
2538 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2539 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2541 if (readBytes != ReplicationSlotOnDiskConstantSize)
2542 {
2543 if (readBytes < 0)
2544 ereport(PANIC,
2546 errmsg("could not read file \"%s\": %m", path)));
2547 else
2548 ereport(PANIC,
2550 errmsg("could not read file \"%s\": read %d of %zu",
2551 path, readBytes,
2553 }
2554
2555 /* verify magic */
2556 if (cp.magic != SLOT_MAGIC)
2557 ereport(PANIC,
2559 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2560 path, cp.magic, SLOT_MAGIC)));
2561
2562 /* verify version */
2563 if (cp.version != SLOT_VERSION)
2564 ereport(PANIC,
2566 errmsg("replication slot file \"%s\" has unsupported version %u",
2567 path, cp.version)));
2568
2569 /* boundary check on length */
2571 ereport(PANIC,
2573 errmsg("replication slot file \"%s\" has corrupted length %u",
2574 path, cp.length)));
2575
2576 /* Now that we know the size, read the entire file */
2577 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2578 readBytes = read(fd,
2579 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2580 cp.length);
2582 if (readBytes != cp.length)
2583 {
2584 if (readBytes < 0)
2585 ereport(PANIC,
2587 errmsg("could not read file \"%s\": %m", path)));
2588 else
2589 ereport(PANIC,
2591 errmsg("could not read file \"%s\": read %d of %zu",
2592 path, readBytes, (Size) cp.length)));
2593 }
2594
2595 if (CloseTransientFile(fd) != 0)
2596 ereport(PANIC,
2598 errmsg("could not close file \"%s\": %m", path)));
2599
2600 /* now verify the CRC */
2601 INIT_CRC32C(checksum);
2602 COMP_CRC32C(checksum,
2605 FIN_CRC32C(checksum);
2606
2607 if (!EQ_CRC32C(checksum, cp.checksum))
2608 ereport(PANIC,
2609 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2610 path, checksum, cp.checksum)));
2611
2612 /*
2613 * If we crashed with an ephemeral slot active, don't restore but delete
2614 * it.
2615 */
2617 {
2618 if (!rmtree(slotdir, true))
2619 {
2621 (errmsg("could not remove directory \"%s\"",
2622 slotdir)));
2623 }
2625 return;
2626 }
2627
2628 /*
2629 * Verify that requirements for the specific slot type are met. That's
2630 * important because if these aren't met we're not guaranteed to retain
2631 * all the necessary resources for the slot.
2632 *
2633 * NB: We have to do so *after* the above checks for ephemeral slots,
2634 * because otherwise a slot that shouldn't exist anymore could prevent
2635 * restarts.
2636 *
2637 * NB: Changing the requirements here also requires adapting
2638 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2639 */
2640 if (cp.slotdata.database != InvalidOid)
2641 {
2643 ereport(FATAL,
2644 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2645 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2646 NameStr(cp.slotdata.name)),
2647 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2648
2649 /*
2650 * In standby mode, the hot standby must be enabled. This check is
2651 * necessary to ensure logical slots are invalidated when they become
2652 * incompatible due to insufficient wal_level. Otherwise, if the
2653 * primary reduces wal_level < logical while hot standby is disabled,
2654 * logical slots would remain valid even after promotion.
2655 */
2657 ereport(FATAL,
2658 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2659 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2660 NameStr(cp.slotdata.name)),
2661 errhint("Change \"hot_standby\" to be \"on\".")));
2662 }
2663 else if (wal_level < WAL_LEVEL_REPLICA)
2664 ereport(FATAL,
2665 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2666 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2667 NameStr(cp.slotdata.name)),
2668 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2669
2670 /* nothing can be active yet, don't lock anything */
2671 for (i = 0; i < max_replication_slots; i++)
2672 {
2673 ReplicationSlot *slot;
2674
2676
2677 if (slot->in_use)
2678 continue;
2679
2680 /* restore the entire set of persistent data */
2681 memcpy(&slot->data, &cp.slotdata,
2683
2684 /* initialize in memory state */
2685 slot->effective_xmin = cp.slotdata.xmin;
2689
2694
2695 slot->in_use = true;
2696 slot->active_pid = 0;
2697
2698 /*
2699 * Set the time since the slot has become inactive after loading the
2700 * slot from the disk into memory. Whoever acquires the slot i.e.
2701 * makes the slot active will reset it. Use the same inactive_since
2702 * time for all the slots.
2703 */
2704 if (now == 0)
2706
2708
2709 restored = true;
2710 break;
2711 }
2712
2713 if (!restored)
2714 ereport(FATAL,
2715 (errmsg("too many replication slots active before shutdown"),
2716 errhint("Increase \"max_replication_slots\" and try again.")));
2717}
2718
2719/*
2720 * Maps an invalidation reason for a replication slot to
2721 * ReplicationSlotInvalidationCause.
2722 */
2724GetSlotInvalidationCause(const char *cause_name)
2725{
2726 Assert(cause_name);
2727
2728 /* Search lookup table for the cause having this name */
2729 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2730 {
2731 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2733 }
2734
2735 Assert(false);
2736 return RS_INVAL_NONE; /* to keep compiler quiet */
2737}
2738
2739/*
2740 * Maps an ReplicationSlotInvalidationCause to the invalidation
2741 * reason for a replication slot.
2742 */
2743const char *
2745{
2746 /* Search lookup table for the name of this cause */
2747 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2748 {
2749 if (SlotInvalidationCauses[i].cause == cause)
2751 }
2752
2753 Assert(false);
2754 return "none"; /* to keep compiler quiet */
2755}
2756
2757/*
2758 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2759 *
2760 * The rawname will be parsed, and the result will be saved into *elemlist.
2761 */
2762static bool
2763validate_sync_standby_slots(char *rawname, List **elemlist)
2764{
2765 /* Verify syntax and parse string into a list of identifiers */
2766 if (!SplitIdentifierString(rawname, ',', elemlist))
2767 {
2768 GUC_check_errdetail("List syntax is invalid.");
2769 return false;
2770 }
2771
2772 /* Iterate the list to validate each slot name */
2773 foreach_ptr(char, name, *elemlist)
2774 {
2775 int err_code;
2776 char *err_msg = NULL;
2777 char *err_hint = NULL;
2778
2779 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2780 &err_msg, &err_hint))
2781 {
2782 GUC_check_errcode(err_code);
2783 GUC_check_errdetail("%s", err_msg);
2784 if (err_hint != NULL)
2785 GUC_check_errhint("%s", err_hint);
2786 return false;
2787 }
2788 }
2789
2790 return true;
2791}
2792
2793/*
2794 * GUC check_hook for synchronized_standby_slots
2795 */
2796bool
2798{
2799 char *rawname;
2800 char *ptr;
2801 List *elemlist;
2802 int size;
2803 bool ok;
2805
2806 if ((*newval)[0] == '\0')
2807 return true;
2808
2809 /* Need a modifiable copy of the GUC string */
2810 rawname = pstrdup(*newval);
2811
2812 /* Now verify if the specified slots exist and have correct type */
2813 ok = validate_sync_standby_slots(rawname, &elemlist);
2814
2815 if (!ok || elemlist == NIL)
2816 {
2817 pfree(rawname);
2818 list_free(elemlist);
2819 return ok;
2820 }
2821
2822 /* Compute the size required for the SyncStandbySlotsConfigData struct */
2823 size = offsetof(SyncStandbySlotsConfigData, slot_names);
2824 foreach_ptr(char, slot_name, elemlist)
2825 size += strlen(slot_name) + 1;
2826
2827 /* GUC extra value must be guc_malloc'd, not palloc'd */
2828 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2829 if (!config)
2830 return false;
2831
2832 /* Transform the data into SyncStandbySlotsConfigData */
2833 config->nslotnames = list_length(elemlist);
2834
2835 ptr = config->slot_names;
2836 foreach_ptr(char, slot_name, elemlist)
2837 {
2838 strcpy(ptr, slot_name);
2839 ptr += strlen(slot_name) + 1;
2840 }
2841
2842 *extra = config;
2843
2844 pfree(rawname);
2845 list_free(elemlist);
2846 return true;
2847}
2848
2849/*
2850 * GUC assign_hook for synchronized_standby_slots
2851 */
2852void
2854{
2855 /*
2856 * The standby slots may have changed, so we must recompute the oldest
2857 * LSN.
2858 */
2860
2862}
2863
2864/*
2865 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2866 */
2867bool
2868SlotExistsInSyncStandbySlots(const char *slot_name)
2869{
2870 const char *standby_slot_name;
2871
2872 /* Return false if there is no value in synchronized_standby_slots */
2874 return false;
2875
2876 /*
2877 * XXX: We are not expecting this list to be long so a linear search
2878 * shouldn't hurt but if that turns out not to be true then we can cache
2879 * this information for each WalSender as well.
2880 */
2881 standby_slot_name = synchronized_standby_slots_config->slot_names;
2882 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2883 {
2884 if (strcmp(standby_slot_name, slot_name) == 0)
2885 return true;
2886
2887 standby_slot_name += strlen(standby_slot_name) + 1;
2888 }
2889
2890 return false;
2891}
2892
2893/*
2894 * Return true if the slots specified in synchronized_standby_slots have caught up to
2895 * the given WAL location, false otherwise.
2896 *
2897 * The elevel parameter specifies the error level used for logging messages
2898 * related to slots that do not exist, are invalidated, or are inactive.
2899 */
2900bool
2901StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2902{
2903 const char *name;
2904 int caught_up_slot_num = 0;
2905 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2906
2907 /*
2908 * Don't need to wait for the standbys to catch up if there is no value in
2909 * synchronized_standby_slots.
2910 */
2912 return true;
2913
2914 /*
2915 * Don't need to wait for the standbys to catch up if we are on a standby
2916 * server, since we do not support syncing slots to cascading standbys.
2917 */
2918 if (RecoveryInProgress())
2919 return true;
2920
2921 /*
2922 * Don't need to wait for the standbys to catch up if they are already
2923 * beyond the specified WAL location.
2924 */
2926 ss_oldest_flush_lsn >= wait_for_lsn)
2927 return true;
2928
2929 /*
2930 * To prevent concurrent slot dropping and creation while filtering the
2931 * slots, take the ReplicationSlotControlLock outside of the loop.
2932 */
2933 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2934
2936 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2937 {
2938 XLogRecPtr restart_lsn;
2939 bool invalidated;
2940 bool inactive;
2941 ReplicationSlot *slot;
2942
2943 slot = SearchNamedReplicationSlot(name, false);
2944
2945 /*
2946 * If a slot name provided in synchronized_standby_slots does not
2947 * exist, report a message and exit the loop.
2948 */
2949 if (!slot)
2950 {
2951 ereport(elevel,
2952 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2953 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2954 name, "synchronized_standby_slots"),
2955 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2956 name),
2957 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2958 name, "synchronized_standby_slots"));
2959 break;
2960 }
2961
2962 /* Same as above: if a slot is not physical, exit the loop. */
2963 if (SlotIsLogical(slot))
2964 {
2965 ereport(elevel,
2966 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2967 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2968 name, "synchronized_standby_slots"),
2969 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2970 name),
2971 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2972 name, "synchronized_standby_slots"));
2973 break;
2974 }
2975
2976 SpinLockAcquire(&slot->mutex);
2977 restart_lsn = slot->data.restart_lsn;
2978 invalidated = slot->data.invalidated != RS_INVAL_NONE;
2979 inactive = slot->active_pid == 0;
2980 SpinLockRelease(&slot->mutex);
2981
2982 if (invalidated)
2983 {
2984 /* Specified physical slot has been invalidated */
2985 ereport(elevel,
2986 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2987 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2988 name, "synchronized_standby_slots"),
2989 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2990 name),
2991 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2992 name, "synchronized_standby_slots"));
2993 break;
2994 }
2995
2996 if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2997 {
2998 /* Log a message if no active_pid for this physical slot */
2999 if (inactive)
3000 ereport(elevel,
3001 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3002 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3003 name, "synchronized_standby_slots"),
3004 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3005 name),
3006 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3007 name, "synchronized_standby_slots"));
3008
3009 /* Continue if the current slot hasn't caught up. */
3010 break;
3011 }
3012
3013 Assert(restart_lsn >= wait_for_lsn);
3014
3015 if (XLogRecPtrIsInvalid(min_restart_lsn) ||
3016 min_restart_lsn > restart_lsn)
3017 min_restart_lsn = restart_lsn;
3018
3019 caught_up_slot_num++;
3020
3021 name += strlen(name) + 1;
3022 }
3023
3024 LWLockRelease(ReplicationSlotControlLock);
3025
3026 /*
3027 * Return false if not all the standbys have caught up to the specified
3028 * WAL location.
3029 */
3030 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3031 return false;
3032
3033 /* The ss_oldest_flush_lsn must not retreat. */
3035 min_restart_lsn >= ss_oldest_flush_lsn);
3036
3037 ss_oldest_flush_lsn = min_restart_lsn;
3038
3039 return true;
3040}
3041
3042/*
3043 * Wait for physical standbys to confirm receiving the given lsn.
3044 *
3045 * Used by logical decoding SQL functions. It waits for physical standbys
3046 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3047 */
3048void
3050{
3051 /*
3052 * Don't need to wait for the standby to catch up if the current acquired
3053 * slot is not a logical failover slot, or there is no value in
3054 * synchronized_standby_slots.
3055 */
3057 return;
3058
3060
3061 for (;;)
3062 {
3064
3066 {
3067 ConfigReloadPending = false;
3069 }
3070
3071 /* Exit if done waiting for every slot. */
3072 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3073 break;
3074
3075 /*
3076 * Wait for the slots in the synchronized_standby_slots to catch up,
3077 * but use a timeout (1s) so we can also check if the
3078 * synchronized_standby_slots has been changed.
3079 */
3081 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3082 }
3083
3085}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1721
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition: timestamp.c:1795
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define NameStr(name)
Definition: c.h:755
#define ngettext(s, p, n)
Definition: c.h:1184
#define PG_BINARY
Definition: c.h:1276
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:474
uint64_t uint64
Definition: c.h:543
#define pg_unreachable()
Definition: c.h:335
uint32_t uint32
Definition: c.h:542
#define lengthof(array)
Definition: c.h:791
#define MemSet(start, val, len)
Definition: c.h:1023
uint32 TransactionId
Definition: c.h:661
size_t Size
Definition: c.h:614
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
int errcode_for_file_access(void)
Definition: elog.c:886
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint_internal(const char *fmt,...)
Definition: elog.c:1352
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3975
int FreeDir(DIR *dir)
Definition: fd.c:3022
int CloseTransientFile(int fd)
Definition: fd.c:2868
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:753
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2904
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2970
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2691
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool IsBinaryUpgrade
Definition: globals.c:121
int MyProcPid
Definition: globals.c:47
bool IsUnderPostmaster
Definition: globals.c:120
Oid MyDatabaseId
Definition: globals.c:94
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition: guc.c:6617
void * guc_malloc(int elevel, size_t size)
Definition: guc.c:636
#define newval
#define GUC_check_errdetail
Definition: guc.h:505
GucSource
Definition: guc.h:112
@ PGC_SIGHUP
Definition: guc.h:75
#define GUC_check_errhint
Definition: guc.h:509
Assert(PointerIsAligned(start, uint64))
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
int i
Definition: isn.c:77
bool IsLogicalLauncher(void)
Definition: launcher.c:1545
void list_free(List *list)
Definition: list.c:1546
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1977
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2021
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
@ LW_SHARED
Definition: lwlock.h:113
@ LW_EXCLUSIVE
Definition: lwlock.h:112
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
@ B_STARTUP
Definition: miscadmin.h:364
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:469
BackendType MyBackendType
Definition: miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:688
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
static rewind_source * source
Definition: pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:241
#define snprintf
Definition: port.h:239
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3905
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:578
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:625
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:113
char * synchronized_standby_slots
Definition: slot.c:164
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition: slot.c:2853
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:135
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:2125
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
int idle_replication_slot_timeout_secs
Definition: slot.c:158
void ReplicationSlotDropAcquired(void)
Definition: slot.c:996
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1138
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition: slot.c:1746
void ReplicationSlotReserveWal(void)
Definition: slot.c:1571
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1382
struct SlotInvalidationCauseMap SlotInvalidationCauseMap
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:173
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition: slot.c:311
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1440
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:132
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1303
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition: slot.c:2724
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1177
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2482
void ReplicationSlotPersist(void)
Definition: slot.c:1155
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition: slot.c:1648
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2319
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:891
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2868
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition: slot.c:2763
void ReplicationSlotSave(void)
Definition: slot.c:1120
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:545
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2258
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:138
void CheckSlotPermissions(void)
Definition: slot.c:1554
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:594
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition: slot.c:2797
void ReplicationSlotsShmemInit(void)
Definition: slot.c:206
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:914
void ReplicationSlotRelease(void)
Definition: slot.c:763
int max_replication_slots
Definition: slot.c:151
StaticAssertDecl(lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:145
#define SLOT_VERSION
Definition: slot.c:142
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:3049
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2901
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1233
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:852
void ReplicationSlotInitialize(void)
Definition: slot.c:241
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:1013
void StartupReplicationSlots(void)
Definition: slot.c:2197
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1837
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition: slot.c:1730
void CheckSlotRequirements(void)
Definition: slot.c:1532
#define SLOT_MAGIC
Definition: slot.c:141
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:2065
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:167
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:129
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:188
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2744
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:250
static bool IsSlotForConflictCheck(const char *name)
Definition: slot.c:361
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:72
ReplicationSlotPersistency
Definition: slot.h:44
@ RS_PERSISTENT
Definition: slot.h:45
@ RS_EPHEMERAL
Definition: slot.h:46
@ RS_TEMPORARY
Definition: slot.h:47
#define SlotIsPhysical(slot)
Definition: slot.h:254
#define PG_REPLSLOT_DIR
Definition: slot.h:21
ReplicationSlotInvalidationCause
Definition: slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:68
@ RS_INVAL_HORIZON
Definition: slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:66
@ RS_INVAL_NONE
Definition: slot.h:60
#define SlotIsLogical(slot)
Definition: slot.h:255
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:273
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1674
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: dirent.c:26
Definition: pg_list.h:54
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
ReplicationSlot replication_slots[1]
Definition: slot.h:266
uint32 version
Definition: slot.c:75
ReplicationSlotPersistentData slotdata
Definition: slot.c:83
pg_crc32c checksum
Definition: slot.c:72
TransactionId xmin
Definition: slot.h:96
TransactionId catalog_xmin
Definition: slot.h:104
XLogRecPtr confirmed_flush
Definition: slot.h:118
ReplicationSlotPersistency persistency
Definition: slot.h:88
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:110
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:208
TransactionId effective_catalog_xmin
Definition: slot.h:189
slock_t mutex
Definition: slot.h:165
XLogRecPtr candidate_restart_valid
Definition: slot.h:209
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:217
pid_t active_pid
Definition: slot.h:171
bool in_use
Definition: slot.h:168
TransactionId effective_xmin
Definition: slot.h:188
bool just_dirtied
Definition: slot.h:174
XLogRecPtr last_saved_restart_lsn
Definition: slot.h:250
XLogRecPtr candidate_restart_lsn
Definition: slot.h:210
LWLock io_in_progress_lock
Definition: slot.h:195
ConditionVariable active_cv
Definition: slot.h:198
TransactionId candidate_catalog_xmin
Definition: slot.h:207
bool dirty
Definition: slot.h:175
ReplicationSlotPersistentData data
Definition: slot.h:192
TimestampTz inactive_since
Definition: slot.h:224
const char * cause_name
Definition: slot.c:110
ReplicationSlotInvalidationCause cause
Definition: slot.c:109
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:101
ConditionVariable wal_confirm_rcv_cv
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:96
Definition: c.h:750
unsigned short st_mode
Definition: win32_port.h:258
#define InvalidTransactionId
Definition: transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:2744
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
const char * name
bool am_walsender
Definition: walsender.c:123
bool log_replication_commands
Definition: walsender.c:133
WalSndCtlData * WalSndCtl
Definition: walsender.c:117
#define stat
Definition: win32_port.h:274
#define S_ISDIR(m)
Definition: win32_port.h:315
#define kill(pid, sig)
Definition: win32_port.h:493
bool RecoveryInProgress(void)
Definition: xlog.c:6388
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3776
bool EnableHotStandby
Definition: xlog.c:122
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6491
int wal_level
Definition: xlog.c:132
int wal_segment_size
Definition: xlog.c:144
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2668
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9481
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2782
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:51
bool StandbyMode
Definition: xlogrecovery.c:149
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)