PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
51#include "replication/slot.h"
53#include "storage/fd.h"
54#include "storage/ipc.h"
55#include "storage/proc.h"
56#include "storage/procarray.h"
57#include "utils/builtins.h"
58#include "utils/guc_hooks.h"
59#include "utils/varlena.h"
60
61/*
62 * Replication slot on-disk data structure.
63 */
65{
66 /* first part of this struct needs to be version independent */
67
68 /* data not covered by checksum */
71
72 /* data covered by checksum */
75
76 /*
77 * The actual data in the slot that follows can differ based on the above
78 * 'version'.
79 */
80
83
84/*
85 * Struct for the configuration of synchronized_standby_slots.
86 *
87 * Note: this must be a flat representation that can be held in a single chunk
88 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 * synchronized_standby_slots GUC.
90 */
91typedef struct
92{
93 /* Number of slot names in the slot_names[] */
95
96 /*
97 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 */
99 char slot_names[FLEXIBLE_ARRAY_MEMBER];
101
102/*
103 * Lookup table for slot invalidation causes.
104 */
105const char *const SlotInvalidationCauses[] = {
106 [RS_INVAL_NONE] = "none",
107 [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 [RS_INVAL_HORIZON] = "rows_removed",
109 [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110};
111
112/* Maximum number of invalidation causes */
113#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114
116 "array length mismatch");
117
118/* size of version independent data */
119#define ReplicationSlotOnDiskConstantSize \
120 offsetof(ReplicationSlotOnDisk, slotdata)
121/* size of the part of the slot not covered by the checksum */
122#define ReplicationSlotOnDiskNotChecksummedSize \
123 offsetof(ReplicationSlotOnDisk, version)
124/* size of the part covered by the checksum */
125#define ReplicationSlotOnDiskChecksummedSize \
126 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127/* size of the slot data that is version dependent */
128#define ReplicationSlotOnDiskV2Size \
129 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130
131#define SLOT_MAGIC 0x1051CA1 /* format identifier */
132#define SLOT_VERSION 5 /* version for new files */
133
134/* Control array for replication slot management */
136
137/* My backend's replication slot in the shared memory array */
139
140/* GUC variables */
141int max_replication_slots = 10; /* the maximum number of replication
142 * slots */
143
144/*
145 * This GUC lists streaming replication standby server slot names that
146 * logical WAL sender processes will wait for.
147 */
149
150/* This is the parsed and cached configuration for synchronized_standby_slots */
152
153/*
154 * Oldest LSN that has been confirmed to be flushed to the standbys
155 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
156 */
158
159static void ReplicationSlotShmemExit(int code, Datum arg);
160static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161
162/* internal persistency functions */
163static void RestoreSlotFromDisk(const char *name);
164static void CreateSlotOnDisk(ReplicationSlot *slot);
165static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166
167/*
168 * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 */
170Size
172{
173 Size size = 0;
174
175 if (max_replication_slots == 0)
176 return size;
177
178 size = offsetof(ReplicationSlotCtlData, replication_slots);
181
182 return size;
183}
184
185/*
186 * Allocate and initialize shared memory for replication slots.
187 */
188void
190{
191 bool found;
192
193 if (max_replication_slots == 0)
194 return;
195
197 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 &found);
199
200 if (!found)
201 {
202 int i;
203
204 /* First time through, so initialize */
206
207 for (i = 0; i < max_replication_slots; i++)
208 {
210
211 /* everything else is zeroed by the memset above */
212 SpinLockInit(&slot->mutex);
216 }
217 }
218}
219
220/*
221 * Register the callback for replication slot cleanup and releasing.
222 */
223void
225{
227}
228
229/*
230 * Release and cleanup replication slots.
231 */
232static void
234{
235 /* Make sure active replication slots are released */
236 if (MyReplicationSlot != NULL)
238
239 /* Also cleanup all the temporary slots. */
241}
242
243/*
244 * Check whether the passed slot name is valid and report errors at elevel.
245 *
246 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 * the name to be used as a directory name on every supported OS.
248 *
249 * Returns whether the directory name is valid or not if elevel < ERROR.
250 */
251bool
252ReplicationSlotValidateName(const char *name, int elevel)
253{
254 const char *cp;
255
256 if (strlen(name) == 0)
257 {
258 ereport(elevel,
259 (errcode(ERRCODE_INVALID_NAME),
260 errmsg("replication slot name \"%s\" is too short",
261 name)));
262 return false;
263 }
264
265 if (strlen(name) >= NAMEDATALEN)
266 {
267 ereport(elevel,
268 (errcode(ERRCODE_NAME_TOO_LONG),
269 errmsg("replication slot name \"%s\" is too long",
270 name)));
271 return false;
272 }
273
274 for (cp = name; *cp; cp++)
275 {
276 if (!((*cp >= 'a' && *cp <= 'z')
277 || (*cp >= '0' && *cp <= '9')
278 || (*cp == '_')))
279 {
280 ereport(elevel,
281 (errcode(ERRCODE_INVALID_NAME),
282 errmsg("replication slot name \"%s\" contains invalid character",
283 name),
284 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 return false;
286 }
287 }
288 return true;
289}
290
291/*
292 * Create a new replication slot and mark it as used by this backend.
293 *
294 * name: Name of the slot
295 * db_specific: logical decoding is db specific; if the slot is going to
296 * be used for that pass true, otherwise false.
297 * two_phase: Allows decoding of prepared transactions. We allow this option
298 * to be enabled only at the slot creation time. If we allow this option
299 * to be changed during decoding then it is quite possible that we skip
300 * prepare first time because this option was not enabled. Now next time
301 * during getting changes, if the two_phase option is enabled it can skip
302 * prepare because by that time start decoding point has been moved. So the
303 * user will only get commit prepared.
304 * failover: If enabled, allows the slot to be synced to standbys so
305 * that logical replication can be resumed after failover.
306 * synced: True if the slot is synchronized from the primary server.
307 */
308void
309ReplicationSlotCreate(const char *name, bool db_specific,
310 ReplicationSlotPersistency persistency,
311 bool two_phase, bool failover, bool synced)
312{
313 ReplicationSlot *slot = NULL;
314 int i;
315
316 Assert(MyReplicationSlot == NULL);
317
319
320 if (failover)
321 {
322 /*
323 * Do not allow users to create the failover enabled slots on the
324 * standby as we do not support sync to the cascading standby.
325 *
326 * However, failover enabled slots can be created during slot
327 * synchronization because we need to retain the same values as the
328 * remote slot.
329 */
332 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 errmsg("cannot enable failover for a replication slot created on the standby"));
334
335 /*
336 * Do not allow users to create failover enabled temporary slots,
337 * because temporary slots will not be synced to the standby.
338 *
339 * However, failover enabled temporary slots can be created during
340 * slot synchronization. See the comments atop slotsync.c for details.
341 */
342 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
344 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 errmsg("cannot enable failover for a temporary replication slot"));
346 }
347
348 /*
349 * If some other backend ran this code concurrently with us, we'd likely
350 * both allocate the same slot, and that would be bad. We'd also be at
351 * risk of missing a name collision. Also, we don't want to try to create
352 * a new slot while somebody's busy cleaning up an old one, because we
353 * might both be monkeying with the same directory.
354 */
355 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356
357 /*
358 * Check for name collision, and identify an allocatable slot. We need to
359 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 * else can change the in_use flags while we're looking at them.
361 */
362 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 for (i = 0; i < max_replication_slots; i++)
364 {
366
367 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
370 errmsg("replication slot \"%s\" already exists", name)));
371 if (!s->in_use && slot == NULL)
372 slot = s;
373 }
374 LWLockRelease(ReplicationSlotControlLock);
375
376 /* If all slots are in use, we're out of luck. */
377 if (slot == NULL)
379 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 errmsg("all replication slots are in use"),
381 errhint("Free one or increase \"max_replication_slots\".")));
382
383 /*
384 * Since this slot is not in use, nobody should be looking at any part of
385 * it other than the in_use field unless they're trying to allocate it.
386 * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 * be doing that. So it's safe to initialize the slot.
388 */
389 Assert(!slot->in_use);
390 Assert(slot->active_pid == 0);
391
392 /* first initialize persistent data */
393 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
394 namestrcpy(&slot->data.name, name);
395 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
396 slot->data.persistency = persistency;
397 slot->data.two_phase = two_phase;
399 slot->data.failover = failover;
400 slot->data.synced = synced;
401
402 /* and then data only present in shared memory */
403 slot->just_dirtied = false;
404 slot->dirty = false;
412 slot->inactive_since = 0;
413
414 /*
415 * Create the slot on disk. We haven't actually marked the slot allocated
416 * yet, so no special cleanup is required if this errors out.
417 */
418 CreateSlotOnDisk(slot);
419
420 /*
421 * We need to briefly prevent any other backend from iterating over the
422 * slots while we flip the in_use flag. We also need to set the active
423 * flag while holding the ControlLock as otherwise a concurrent
424 * ReplicationSlotAcquire() could acquire the slot as well.
425 */
426 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
427
428 slot->in_use = true;
429
430 /* We can now mark the slot active, and that makes it our slot. */
431 SpinLockAcquire(&slot->mutex);
432 Assert(slot->active_pid == 0);
433 slot->active_pid = MyProcPid;
434 SpinLockRelease(&slot->mutex);
435 MyReplicationSlot = slot;
436
437 LWLockRelease(ReplicationSlotControlLock);
438
439 /*
440 * Create statistics entry for the new logical slot. We don't collect any
441 * stats for physical slots, so no need to create an entry for the same.
442 * See ReplicationSlotDropPtr for why we need to do this before releasing
443 * ReplicationSlotAllocationLock.
444 */
445 if (SlotIsLogical(slot))
447
448 /*
449 * Now that the slot has been marked as in_use and active, it's safe to
450 * let somebody else try to allocate a slot.
451 */
452 LWLockRelease(ReplicationSlotAllocationLock);
453
454 /* Let everybody know we've modified this slot */
456}
457
458/*
459 * Search for the named replication slot.
460 *
461 * Return the replication slot if found, otherwise NULL.
462 */
464SearchNamedReplicationSlot(const char *name, bool need_lock)
465{
466 int i;
467 ReplicationSlot *slot = NULL;
468
469 if (need_lock)
470 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
471
472 for (i = 0; i < max_replication_slots; i++)
473 {
475
476 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
477 {
478 slot = s;
479 break;
480 }
481 }
482
483 if (need_lock)
484 LWLockRelease(ReplicationSlotControlLock);
485
486 return slot;
487}
488
489/*
490 * Return the index of the replication slot in
491 * ReplicationSlotCtl->replication_slots.
492 *
493 * This is mainly useful to have an efficient key for storing replication slot
494 * stats.
495 */
496int
498{
500 slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
501
503}
504
505/*
506 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
507 * the slot's name and true is returned.
508 *
509 * This likely is only useful for pgstat_replslot.c during shutdown, in other
510 * cases there are obvious TOCTOU issues.
511 */
512bool
514{
515 ReplicationSlot *slot;
516 bool found;
517
519
520 /*
521 * Ensure that the slot cannot be dropped while we copy the name. Don't
522 * need the spinlock as the name of an existing slot cannot change.
523 */
524 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
525 found = slot->in_use;
526 if (slot->in_use)
528 LWLockRelease(ReplicationSlotControlLock);
529
530 return found;
531}
532
533/*
534 * Find a previously created slot and mark it as used by this process.
535 *
536 * An error is raised if nowait is true and the slot is currently in use. If
537 * nowait is false, we sleep until the slot is released by the owning process.
538 */
539void
540ReplicationSlotAcquire(const char *name, bool nowait)
541{
543 int active_pid;
544
545 Assert(name != NULL);
546
547retry:
548 Assert(MyReplicationSlot == NULL);
549
550 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
551
552 /* Check if the slot exits with the given name. */
554 if (s == NULL || !s->in_use)
555 {
556 LWLockRelease(ReplicationSlotControlLock);
557
559 (errcode(ERRCODE_UNDEFINED_OBJECT),
560 errmsg("replication slot \"%s\" does not exist",
561 name)));
562 }
563
564 /*
565 * This is the slot we want; check if it's active under some other
566 * process. In single user mode, we don't need this check.
567 */
569 {
570 /*
571 * Get ready to sleep on the slot in case it is active. (We may end
572 * up not sleeping, but we don't want to do this while holding the
573 * spinlock.)
574 */
575 if (!nowait)
577
579 if (s->active_pid == 0)
581 active_pid = s->active_pid;
583 }
584 else
585 active_pid = MyProcPid;
586 LWLockRelease(ReplicationSlotControlLock);
587
588 /*
589 * If we found the slot but it's already active in another process, we
590 * wait until the owning process signals us that it's been released, or
591 * error out.
592 */
593 if (active_pid != MyProcPid)
594 {
595 if (!nowait)
596 {
597 /* Wait here until we get signaled, and then restart */
599 WAIT_EVENT_REPLICATION_SLOT_DROP);
601 goto retry;
602 }
603
605 (errcode(ERRCODE_OBJECT_IN_USE),
606 errmsg("replication slot \"%s\" is active for PID %d",
607 NameStr(s->data.name), active_pid)));
608 }
609 else if (!nowait)
610 ConditionVariableCancelSleep(); /* no sleep needed after all */
611
612 /* Let everybody know we've modified this slot */
614
615 /* We made this slot active, so it's ours now. */
617
618 /*
619 * The call to pgstat_acquire_replslot() protects against stats for a
620 * different slot, from before a restart or such, being present during
621 * pgstat_report_replslot().
622 */
623 if (SlotIsLogical(s))
625
626 /*
627 * Reset the time since the slot has become inactive as the slot is active
628 * now.
629 */
631 s->inactive_since = 0;
633
634 if (am_walsender)
635 {
638 ? errmsg("acquired logical replication slot \"%s\"",
639 NameStr(s->data.name))
640 : errmsg("acquired physical replication slot \"%s\"",
641 NameStr(s->data.name)));
642 }
643}
644
645/*
646 * Release the replication slot that this backend considers to own.
647 *
648 * This or another backend can re-acquire the slot later.
649 * Resources this slot requires will be preserved.
650 */
651void
653{
655 char *slotname = NULL; /* keep compiler quiet */
656 bool is_logical = false; /* keep compiler quiet */
657 TimestampTz now = 0;
658
659 Assert(slot != NULL && slot->active_pid != 0);
660
661 if (am_walsender)
662 {
663 slotname = pstrdup(NameStr(slot->data.name));
664 is_logical = SlotIsLogical(slot);
665 }
666
667 if (slot->data.persistency == RS_EPHEMERAL)
668 {
669 /*
670 * Delete the slot. There is no !PANIC case where this is allowed to
671 * fail, all that may happen is an incomplete cleanup of the on-disk
672 * data.
673 */
675 }
676
677 /*
678 * If slot needed to temporarily restrain both data and catalog xmin to
679 * create the catalog snapshot, remove that temporary constraint.
680 * Snapshots can only be exported while the initial snapshot is still
681 * acquired.
682 */
683 if (!TransactionIdIsValid(slot->data.xmin) &&
685 {
686 SpinLockAcquire(&slot->mutex);
688 SpinLockRelease(&slot->mutex);
690 }
691
692 /*
693 * Set the time since the slot has become inactive. We get the current
694 * time beforehand to avoid system call while holding the spinlock.
695 */
697
698 if (slot->data.persistency == RS_PERSISTENT)
699 {
700 /*
701 * Mark persistent slot inactive. We're not freeing it, just
702 * disconnecting, but wake up others that may be waiting for it.
703 */
704 SpinLockAcquire(&slot->mutex);
705 slot->active_pid = 0;
706 slot->inactive_since = now;
707 SpinLockRelease(&slot->mutex);
709 }
710 else
711 {
712 SpinLockAcquire(&slot->mutex);
713 slot->inactive_since = now;
714 SpinLockRelease(&slot->mutex);
715 }
716
717 MyReplicationSlot = NULL;
718
719 /* might not have been set when we've been a plain slot */
720 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
721 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
723 LWLockRelease(ProcArrayLock);
724
725 if (am_walsender)
726 {
728 is_logical
729 ? errmsg("released logical replication slot \"%s\"",
730 slotname)
731 : errmsg("released physical replication slot \"%s\"",
732 slotname));
733
734 pfree(slotname);
735 }
736}
737
738/*
739 * Cleanup temporary slots created in current session.
740 *
741 * Cleanup only synced temporary slots if 'synced_only' is true, else
742 * cleanup all temporary slots.
743 */
744void
745ReplicationSlotCleanup(bool synced_only)
746{
747 int i;
748
749 Assert(MyReplicationSlot == NULL);
750
751restart:
752 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
753 for (i = 0; i < max_replication_slots; i++)
754 {
756
757 if (!s->in_use)
758 continue;
759
761 if ((s->active_pid == MyProcPid &&
762 (!synced_only || s->data.synced)))
763 {
766 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
767
769
771 goto restart;
772 }
773 else
775 }
776
777 LWLockRelease(ReplicationSlotControlLock);
778}
779
780/*
781 * Permanently drop replication slot identified by the passed in name.
782 */
783void
784ReplicationSlotDrop(const char *name, bool nowait)
785{
786 Assert(MyReplicationSlot == NULL);
787
789
790 /*
791 * Do not allow users to drop the slots which are currently being synced
792 * from the primary to the standby.
793 */
796 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
797 errmsg("cannot drop replication slot \"%s\"", name),
798 errdetail("This replication slot is being synchronized from the primary server."));
799
801}
802
803/*
804 * Change the definition of the slot identified by the specified name.
805 */
806void
807ReplicationSlotAlter(const char *name, const bool *failover,
808 const bool *two_phase)
809{
810 bool update_slot = false;
811
812 Assert(MyReplicationSlot == NULL);
813 Assert(failover || two_phase);
814
816
819 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
820 errmsg("cannot use %s with a physical replication slot",
821 "ALTER_REPLICATION_SLOT"));
822
825 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
826 errmsg("cannot alter invalid replication slot \"%s\"", name),
827 errdetail("This replication slot has been invalidated due to \"%s\".",
829
830 if (RecoveryInProgress())
831 {
832 /*
833 * Do not allow users to alter the slots which are currently being
834 * synced from the primary to the standby.
835 */
838 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
839 errmsg("cannot alter replication slot \"%s\"", name),
840 errdetail("This replication slot is being synchronized from the primary server."));
841
842 /*
843 * Do not allow users to enable failover on the standby as we do not
844 * support sync to the cascading standby.
845 */
846 if (failover && *failover)
848 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
849 errmsg("cannot enable failover for a replication slot"
850 " on the standby"));
851 }
852
853 if (failover)
854 {
855 /*
856 * Do not allow users to enable failover for temporary slots as we do
857 * not support syncing temporary slots to the standby.
858 */
859 if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
861 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
862 errmsg("cannot enable failover for a temporary replication slot"));
863
864 if (MyReplicationSlot->data.failover != *failover)
865 {
867 MyReplicationSlot->data.failover = *failover;
869
870 update_slot = true;
871 }
872 }
873
875 {
879
880 update_slot = true;
881 }
882
883 if (update_slot)
884 {
887 }
888
890}
891
892/*
893 * Permanently drop the currently acquired replication slot.
894 */
895void
897{
899
900 Assert(MyReplicationSlot != NULL);
901
902 /* slot isn't acquired anymore */
903 MyReplicationSlot = NULL;
904
906}
907
908/*
909 * Permanently drop the replication slot which will be released by the point
910 * this function returns.
911 */
912static void
914{
915 char path[MAXPGPATH];
916 char tmppath[MAXPGPATH];
917
918 /*
919 * If some other backend ran this code concurrently with us, we might try
920 * to delete a slot with a certain name while someone else was trying to
921 * create a slot with the same name.
922 */
923 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
924
925 /* Generate pathnames. */
926 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
927 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
928
929 /*
930 * Rename the slot directory on disk, so that we'll no longer recognize
931 * this as a valid slot. Note that if this fails, we've got to mark the
932 * slot inactive before bailing out. If we're dropping an ephemeral or a
933 * temporary slot, we better never fail hard as the caller won't expect
934 * the slot to survive and this might get called during error handling.
935 */
936 if (rename(path, tmppath) == 0)
937 {
938 /*
939 * We need to fsync() the directory we just renamed and its parent to
940 * make sure that our changes are on disk in a crash-safe fashion. If
941 * fsync() fails, we can't be sure whether the changes are on disk or
942 * not. For now, we handle that by panicking;
943 * StartupReplicationSlots() will try to straighten it out after
944 * restart.
945 */
947 fsync_fname(tmppath, true);
950 }
951 else
952 {
953 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
954
955 SpinLockAcquire(&slot->mutex);
956 slot->active_pid = 0;
957 SpinLockRelease(&slot->mutex);
958
959 /* wake up anyone waiting on this slot */
961
962 ereport(fail_softly ? WARNING : ERROR,
964 errmsg("could not rename file \"%s\" to \"%s\": %m",
965 path, tmppath)));
966 }
967
968 /*
969 * The slot is definitely gone. Lock out concurrent scans of the array
970 * long enough to kill it. It's OK to clear the active PID here without
971 * grabbing the mutex because nobody else can be scanning the array here,
972 * and nobody can be attached to this slot and thus access it without
973 * scanning the array.
974 *
975 * Also wake up processes waiting for it.
976 */
977 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
978 slot->active_pid = 0;
979 slot->in_use = false;
980 LWLockRelease(ReplicationSlotControlLock);
982
983 /*
984 * Slot is dead and doesn't prevent resource removal anymore, recompute
985 * limits.
986 */
989
990 /*
991 * If removing the directory fails, the worst thing that will happen is
992 * that the user won't be able to create a new slot with the same name
993 * until the next server restart. We warn about it, but that's all.
994 */
995 if (!rmtree(tmppath, true))
997 (errmsg("could not remove directory \"%s\"", tmppath)));
998
999 /*
1000 * Drop the statistics entry for the replication slot. Do this while
1001 * holding ReplicationSlotAllocationLock so that we don't drop a
1002 * statistics entry for another slot with the same name just created in
1003 * another session.
1004 */
1005 if (SlotIsLogical(slot))
1007
1008 /*
1009 * We release this at the very end, so that nobody starts trying to create
1010 * a slot while we're still cleaning up the detritus of the old one.
1011 */
1012 LWLockRelease(ReplicationSlotAllocationLock);
1013}
1014
1015/*
1016 * Serialize the currently acquired slot's state from memory to disk, thereby
1017 * guaranteeing the current state will survive a crash.
1018 */
1019void
1021{
1022 char path[MAXPGPATH];
1023
1024 Assert(MyReplicationSlot != NULL);
1025
1028}
1029
1030/*
1031 * Signal that it would be useful if the currently acquired slot would be
1032 * flushed out to disk.
1033 *
1034 * Note that the actual flush to disk can be delayed for a long time, if
1035 * required for correctness explicitly do a ReplicationSlotSave().
1036 */
1037void
1039{
1041
1042 Assert(MyReplicationSlot != NULL);
1043
1044 SpinLockAcquire(&slot->mutex);
1046 MyReplicationSlot->dirty = true;
1047 SpinLockRelease(&slot->mutex);
1048}
1049
1050/*
1051 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1052 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1053 */
1054void
1056{
1058
1059 Assert(slot != NULL);
1061
1062 SpinLockAcquire(&slot->mutex);
1064 SpinLockRelease(&slot->mutex);
1065
1068}
1069
1070/*
1071 * Compute the oldest xmin across all slots and store it in the ProcArray.
1072 *
1073 * If already_locked is true, ProcArrayLock has already been acquired
1074 * exclusively.
1075 */
1076void
1078{
1079 int i;
1081 TransactionId agg_catalog_xmin = InvalidTransactionId;
1082
1083 Assert(ReplicationSlotCtl != NULL);
1084
1085 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1086
1087 for (i = 0; i < max_replication_slots; i++)
1088 {
1090 TransactionId effective_xmin;
1091 TransactionId effective_catalog_xmin;
1092 bool invalidated;
1093
1094 if (!s->in_use)
1095 continue;
1096
1098 effective_xmin = s->effective_xmin;
1099 effective_catalog_xmin = s->effective_catalog_xmin;
1100 invalidated = s->data.invalidated != RS_INVAL_NONE;
1102
1103 /* invalidated slots need not apply */
1104 if (invalidated)
1105 continue;
1106
1107 /* check the data xmin */
1108 if (TransactionIdIsValid(effective_xmin) &&
1109 (!TransactionIdIsValid(agg_xmin) ||
1110 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1111 agg_xmin = effective_xmin;
1112
1113 /* check the catalog xmin */
1114 if (TransactionIdIsValid(effective_catalog_xmin) &&
1115 (!TransactionIdIsValid(agg_catalog_xmin) ||
1116 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1117 agg_catalog_xmin = effective_catalog_xmin;
1118 }
1119
1120 LWLockRelease(ReplicationSlotControlLock);
1121
1122 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1123}
1124
1125/*
1126 * Compute the oldest restart LSN across all slots and inform xlog module.
1127 *
1128 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1129 * purpose, we don't try to account for that, because this module doesn't
1130 * know what to compare against.
1131 */
1132void
1134{
1135 int i;
1136 XLogRecPtr min_required = InvalidXLogRecPtr;
1137
1138 Assert(ReplicationSlotCtl != NULL);
1139
1140 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1141 for (i = 0; i < max_replication_slots; i++)
1142 {
1144 XLogRecPtr restart_lsn;
1145 bool invalidated;
1146
1147 if (!s->in_use)
1148 continue;
1149
1151 restart_lsn = s->data.restart_lsn;
1152 invalidated = s->data.invalidated != RS_INVAL_NONE;
1154
1155 /* invalidated slots need not apply */
1156 if (invalidated)
1157 continue;
1158
1159 if (restart_lsn != InvalidXLogRecPtr &&
1160 (min_required == InvalidXLogRecPtr ||
1161 restart_lsn < min_required))
1162 min_required = restart_lsn;
1163 }
1164 LWLockRelease(ReplicationSlotControlLock);
1165
1167}
1168
1169/*
1170 * Compute the oldest WAL LSN required by *logical* decoding slots..
1171 *
1172 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1173 * slots exist.
1174 *
1175 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1176 * ignores physical replication slots.
1177 *
1178 * The results aren't required frequently, so we don't maintain a precomputed
1179 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1180 */
1183{
1185 int i;
1186
1187 if (max_replication_slots <= 0)
1188 return InvalidXLogRecPtr;
1189
1190 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1191
1192 for (i = 0; i < max_replication_slots; i++)
1193 {
1194 ReplicationSlot *s;
1195 XLogRecPtr restart_lsn;
1196 bool invalidated;
1197
1199
1200 /* cannot change while ReplicationSlotCtlLock is held */
1201 if (!s->in_use)
1202 continue;
1203
1204 /* we're only interested in logical slots */
1205 if (!SlotIsLogical(s))
1206 continue;
1207
1208 /* read once, it's ok if it increases while we're checking */
1210 restart_lsn = s->data.restart_lsn;
1211 invalidated = s->data.invalidated != RS_INVAL_NONE;
1213
1214 /* invalidated slots need not apply */
1215 if (invalidated)
1216 continue;
1217
1218 if (restart_lsn == InvalidXLogRecPtr)
1219 continue;
1220
1221 if (result == InvalidXLogRecPtr ||
1222 restart_lsn < result)
1223 result = restart_lsn;
1224 }
1225
1226 LWLockRelease(ReplicationSlotControlLock);
1227
1228 return result;
1229}
1230
1231/*
1232 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1233 * passed database oid.
1234 *
1235 * Returns true if there are any slots referencing the database. *nslots will
1236 * be set to the absolute number of slots in the database, *nactive to ones
1237 * currently active.
1238 */
1239bool
1240ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1241{
1242 int i;
1243
1244 *nslots = *nactive = 0;
1245
1246 if (max_replication_slots <= 0)
1247 return false;
1248
1249 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1250 for (i = 0; i < max_replication_slots; i++)
1251 {
1252 ReplicationSlot *s;
1253
1255
1256 /* cannot change while ReplicationSlotCtlLock is held */
1257 if (!s->in_use)
1258 continue;
1259
1260 /* only logical slots are database specific, skip */
1261 if (!SlotIsLogical(s))
1262 continue;
1263
1264 /* not our database, skip */
1265 if (s->data.database != dboid)
1266 continue;
1267
1268 /* NB: intentionally counting invalidated slots */
1269
1270 /* count slots with spinlock held */
1272 (*nslots)++;
1273 if (s->active_pid != 0)
1274 (*nactive)++;
1276 }
1277 LWLockRelease(ReplicationSlotControlLock);
1278
1279 if (*nslots > 0)
1280 return true;
1281 return false;
1282}
1283
1284/*
1285 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1286 * passed database oid. The caller should hold an exclusive lock on the
1287 * pg_database oid for the database to prevent creation of new slots on the db
1288 * or replay from existing slots.
1289 *
1290 * Another session that concurrently acquires an existing slot on the target DB
1291 * (most likely to drop it) may cause this function to ERROR. If that happens
1292 * it may have dropped some but not all slots.
1293 *
1294 * This routine isn't as efficient as it could be - but we don't drop
1295 * databases often, especially databases with lots of slots.
1296 */
1297void
1299{
1300 int i;
1301
1302 if (max_replication_slots <= 0)
1303 return;
1304
1305restart:
1306 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1307 for (i = 0; i < max_replication_slots; i++)
1308 {
1309 ReplicationSlot *s;
1310 char *slotname;
1311 int active_pid;
1312
1314
1315 /* cannot change while ReplicationSlotCtlLock is held */
1316 if (!s->in_use)
1317 continue;
1318
1319 /* only logical slots are database specific, skip */
1320 if (!SlotIsLogical(s))
1321 continue;
1322
1323 /* not our database, skip */
1324 if (s->data.database != dboid)
1325 continue;
1326
1327 /* NB: intentionally including invalidated slots */
1328
1329 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1331 /* can't change while ReplicationSlotControlLock is held */
1332 slotname = NameStr(s->data.name);
1333 active_pid = s->active_pid;
1334 if (active_pid == 0)
1335 {
1337 s->active_pid = MyProcPid;
1338 }
1340
1341 /*
1342 * Even though we hold an exclusive lock on the database object a
1343 * logical slot for that DB can still be active, e.g. if it's
1344 * concurrently being dropped by a backend connected to another DB.
1345 *
1346 * That's fairly unlikely in practice, so we'll just bail out.
1347 *
1348 * The slot sync worker holds a shared lock on the database before
1349 * operating on synced logical slots to avoid conflict with the drop
1350 * happening here. The persistent synced slots are thus safe but there
1351 * is a possibility that the slot sync worker has created a temporary
1352 * slot (which stays active even on release) and we are trying to drop
1353 * that here. In practice, the chances of hitting this scenario are
1354 * less as during slot synchronization, the temporary slot is
1355 * immediately converted to persistent and thus is safe due to the
1356 * shared lock taken on the database. So, we'll just bail out in such
1357 * a case.
1358 *
1359 * XXX: We can consider shutting down the slot sync worker before
1360 * trying to drop synced temporary slots here.
1361 */
1362 if (active_pid)
1363 ereport(ERROR,
1364 (errcode(ERRCODE_OBJECT_IN_USE),
1365 errmsg("replication slot \"%s\" is active for PID %d",
1366 slotname, active_pid)));
1367
1368 /*
1369 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1370 * holding ReplicationSlotControlLock over filesystem operations,
1371 * release ReplicationSlotControlLock and use
1372 * ReplicationSlotDropAcquired.
1373 *
1374 * As that means the set of slots could change, restart scan from the
1375 * beginning each time we release the lock.
1376 */
1377 LWLockRelease(ReplicationSlotControlLock);
1379 goto restart;
1380 }
1381 LWLockRelease(ReplicationSlotControlLock);
1382}
1383
1384
1385/*
1386 * Check whether the server's configuration supports using replication
1387 * slots.
1388 */
1389void
1391{
1392 /*
1393 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1394 * needs the same check.
1395 */
1396
1397 if (max_replication_slots == 0)
1398 ereport(ERROR,
1399 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1400 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1401
1403 ereport(ERROR,
1404 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1405 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1406}
1407
1408/*
1409 * Check whether the user has privilege to use replication slots.
1410 */
1411void
1413{
1415 ereport(ERROR,
1416 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1417 errmsg("permission denied to use replication slots"),
1418 errdetail("Only roles with the %s attribute may use replication slots.",
1419 "REPLICATION")));
1420}
1421
1422/*
1423 * Reserve WAL for the currently active slot.
1424 *
1425 * Compute and set restart_lsn in a manner that's appropriate for the type of
1426 * the slot and concurrency safe.
1427 */
1428void
1430{
1432
1433 Assert(slot != NULL);
1435
1436 /*
1437 * The replication slot mechanism is used to prevent removal of required
1438 * WAL. As there is no interlock between this routine and checkpoints, WAL
1439 * segments could concurrently be removed when a now stale return value of
1440 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1441 * this happens we'll just retry.
1442 */
1443 while (true)
1444 {
1445 XLogSegNo segno;
1446 XLogRecPtr restart_lsn;
1447
1448 /*
1449 * For logical slots log a standby snapshot and start logical decoding
1450 * at exactly that position. That allows the slot to start up more
1451 * quickly. But on a standby we cannot do WAL writes, so just use the
1452 * replay pointer; effectively, an attempt to create a logical slot on
1453 * standby will cause it to wait for an xl_running_xact record to be
1454 * logged independently on the primary, so that a snapshot can be
1455 * built using the record.
1456 *
1457 * None of this is needed (or indeed helpful) for physical slots as
1458 * they'll start replay at the last logged checkpoint anyway. Instead
1459 * return the location of the last redo LSN. While that slightly
1460 * increases the chance that we have to retry, it's where a base
1461 * backup has to start replay at.
1462 */
1463 if (SlotIsPhysical(slot))
1464 restart_lsn = GetRedoRecPtr();
1465 else if (RecoveryInProgress())
1466 restart_lsn = GetXLogReplayRecPtr(NULL);
1467 else
1468 restart_lsn = GetXLogInsertRecPtr();
1469
1470 SpinLockAcquire(&slot->mutex);
1471 slot->data.restart_lsn = restart_lsn;
1472 SpinLockRelease(&slot->mutex);
1473
1474 /* prevent WAL removal as fast as possible */
1476
1477 /*
1478 * If all required WAL is still there, great, otherwise retry. The
1479 * slot should prevent further removal of WAL, unless there's a
1480 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1481 * the new restart_lsn above, so normally we should never need to loop
1482 * more than twice.
1483 */
1485 if (XLogGetLastRemovedSegno() < segno)
1486 break;
1487 }
1488
1489 if (!RecoveryInProgress() && SlotIsLogical(slot))
1490 {
1491 XLogRecPtr flushptr;
1492
1493 /* make sure we have enough information to start */
1494 flushptr = LogStandbySnapshot();
1495
1496 /* and make sure it's fsynced to disk */
1497 XLogFlush(flushptr);
1498 }
1499}
1500
1501/*
1502 * Report that replication slot needs to be invalidated
1503 */
1504static void
1506 bool terminating,
1507 int pid,
1508 NameData slotname,
1509 XLogRecPtr restart_lsn,
1510 XLogRecPtr oldestLSN,
1511 TransactionId snapshotConflictHorizon)
1512{
1513 StringInfoData err_detail;
1514 bool hint = false;
1515
1516 initStringInfo(&err_detail);
1517
1518 switch (cause)
1519 {
1521 {
1522 unsigned long long ex = oldestLSN - restart_lsn;
1523
1524 hint = true;
1525 appendStringInfo(&err_detail,
1526 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1527 "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1528 ex),
1529 LSN_FORMAT_ARGS(restart_lsn),
1530 ex);
1531 break;
1532 }
1533 case RS_INVAL_HORIZON:
1534 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1535 snapshotConflictHorizon);
1536 break;
1537
1538 case RS_INVAL_WAL_LEVEL:
1539 appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1540 break;
1541 case RS_INVAL_NONE:
1543 }
1544
1545 ereport(LOG,
1546 terminating ?
1547 errmsg("terminating process %d to release replication slot \"%s\"",
1548 pid, NameStr(slotname)) :
1549 errmsg("invalidating obsolete replication slot \"%s\"",
1550 NameStr(slotname)),
1551 errdetail_internal("%s", err_detail.data),
1552 hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
1553
1554 pfree(err_detail.data);
1555}
1556
1557/*
1558 * Helper for InvalidateObsoleteReplicationSlots
1559 *
1560 * Acquires the given slot and mark it invalid, if necessary and possible.
1561 *
1562 * Returns whether ReplicationSlotControlLock was released in the interim (and
1563 * in that case we're not holding the lock at return, otherwise we are).
1564 *
1565 * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1566 *
1567 * This is inherently racy, because we release the LWLock
1568 * for syscalls, so caller must restart if we return true.
1569 */
1570static bool
1572 ReplicationSlot *s,
1573 XLogRecPtr oldestLSN,
1574 Oid dboid, TransactionId snapshotConflictHorizon,
1575 bool *invalidated)
1576{
1577 int last_signaled_pid = 0;
1578 bool released_lock = false;
1579 bool terminated = false;
1580 TransactionId initial_effective_xmin = InvalidTransactionId;
1581 TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1582 XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1584
1585 for (;;)
1586 {
1587 XLogRecPtr restart_lsn;
1588 NameData slotname;
1589 int active_pid = 0;
1591
1592 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1593
1594 if (!s->in_use)
1595 {
1596 if (released_lock)
1597 LWLockRelease(ReplicationSlotControlLock);
1598 break;
1599 }
1600
1601 /*
1602 * Check if the slot needs to be invalidated. If it needs to be
1603 * invalidated, and is not currently acquired, acquire it and mark it
1604 * as having been invalidated. We do this with the spinlock held to
1605 * avoid race conditions -- for example the restart_lsn could move
1606 * forward, or the slot could be dropped.
1607 */
1609
1610 restart_lsn = s->data.restart_lsn;
1611
1612 /* we do nothing if the slot is already invalid */
1613 if (s->data.invalidated == RS_INVAL_NONE)
1614 {
1615 /*
1616 * The slot's mutex will be released soon, and it is possible that
1617 * those values change since the process holding the slot has been
1618 * terminated (if any), so record them here to ensure that we
1619 * would report the correct invalidation cause.
1620 */
1621 if (!terminated)
1622 {
1623 initial_restart_lsn = s->data.restart_lsn;
1624 initial_effective_xmin = s->effective_xmin;
1625 initial_catalog_effective_xmin = s->effective_catalog_xmin;
1626 }
1627
1628 switch (cause)
1629 {
1631 if (initial_restart_lsn != InvalidXLogRecPtr &&
1632 initial_restart_lsn < oldestLSN)
1633 invalidation_cause = cause;
1634 break;
1635 case RS_INVAL_HORIZON:
1636 if (!SlotIsLogical(s))
1637 break;
1638 /* invalid DB oid signals a shared relation */
1639 if (dboid != InvalidOid && dboid != s->data.database)
1640 break;
1641 if (TransactionIdIsValid(initial_effective_xmin) &&
1642 TransactionIdPrecedesOrEquals(initial_effective_xmin,
1643 snapshotConflictHorizon))
1644 invalidation_cause = cause;
1645 else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1646 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1647 snapshotConflictHorizon))
1648 invalidation_cause = cause;
1649 break;
1650 case RS_INVAL_WAL_LEVEL:
1651 if (SlotIsLogical(s))
1652 invalidation_cause = cause;
1653 break;
1654 case RS_INVAL_NONE:
1656 }
1657 }
1658
1659 /*
1660 * The invalidation cause recorded previously should not change while
1661 * the process owning the slot (if any) has been terminated.
1662 */
1663 Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1664 invalidation_cause_prev != invalidation_cause));
1665
1666 /* if there's no invalidation, we're done */
1667 if (invalidation_cause == RS_INVAL_NONE)
1668 {
1670 if (released_lock)
1671 LWLockRelease(ReplicationSlotControlLock);
1672 break;
1673 }
1674
1675 slotname = s->data.name;
1676 active_pid = s->active_pid;
1677
1678 /*
1679 * If the slot can be acquired, do so and mark it invalidated
1680 * immediately. Otherwise we'll signal the owning process, below, and
1681 * retry.
1682 */
1683 if (active_pid == 0)
1684 {
1686 s->active_pid = MyProcPid;
1687 s->data.invalidated = invalidation_cause;
1688
1689 /*
1690 * XXX: We should consider not overwriting restart_lsn and instead
1691 * just rely on .invalidated.
1692 */
1693 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1695
1696 /* Let caller know */
1697 *invalidated = true;
1698 }
1699
1701
1702 /*
1703 * The logical replication slots shouldn't be invalidated as GUC
1704 * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1705 * check_old_cluster_for_valid_slots() where we ensure that no
1706 * invalidated before the upgrade.
1707 */
1708 Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1709
1710 if (active_pid != 0)
1711 {
1712 /*
1713 * Prepare the sleep on the slot's condition variable before
1714 * releasing the lock, to close a possible race condition if the
1715 * slot is released before the sleep below.
1716 */
1718
1719 LWLockRelease(ReplicationSlotControlLock);
1720 released_lock = true;
1721
1722 /*
1723 * Signal to terminate the process that owns the slot, if we
1724 * haven't already signalled it. (Avoidance of repeated
1725 * signalling is the only reason for there to be a loop in this
1726 * routine; otherwise we could rely on caller's restart loop.)
1727 *
1728 * There is the race condition that other process may own the slot
1729 * after its current owner process is terminated and before this
1730 * process owns it. To handle that, we signal only if the PID of
1731 * the owning process has changed from the previous time. (This
1732 * logic assumes that the same PID is not reused very quickly.)
1733 */
1734 if (last_signaled_pid != active_pid)
1735 {
1736 ReportSlotInvalidation(invalidation_cause, true, active_pid,
1737 slotname, restart_lsn,
1738 oldestLSN, snapshotConflictHorizon);
1739
1740 if (MyBackendType == B_STARTUP)
1741 (void) SendProcSignal(active_pid,
1744 else
1745 (void) kill(active_pid, SIGTERM);
1746
1747 last_signaled_pid = active_pid;
1748 terminated = true;
1749 invalidation_cause_prev = invalidation_cause;
1750 }
1751
1752 /* Wait until the slot is released. */
1754 WAIT_EVENT_REPLICATION_SLOT_DROP);
1755
1756 /*
1757 * Re-acquire lock and start over; we expect to invalidate the
1758 * slot next time (unless another process acquires the slot in the
1759 * meantime).
1760 */
1761 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1762 continue;
1763 }
1764 else
1765 {
1766 /*
1767 * We hold the slot now and have already invalidated it; flush it
1768 * to ensure that state persists.
1769 *
1770 * Don't want to hold ReplicationSlotControlLock across file
1771 * system operations, so release it now but be sure to tell caller
1772 * to restart from scratch.
1773 */
1774 LWLockRelease(ReplicationSlotControlLock);
1775 released_lock = true;
1776
1777 /* Make sure the invalidated state persists across server restart */
1781
1782 ReportSlotInvalidation(invalidation_cause, false, active_pid,
1783 slotname, restart_lsn,
1784 oldestLSN, snapshotConflictHorizon);
1785
1786 /* done with this slot for now */
1787 break;
1788 }
1789 }
1790
1791 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1792
1793 return released_lock;
1794}
1795
1796/*
1797 * Invalidate slots that require resources about to be removed.
1798 *
1799 * Returns true when any slot have got invalidated.
1800 *
1801 * Whether a slot needs to be invalidated depends on the cause. A slot is
1802 * removed if it:
1803 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1804 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1805 * db; dboid may be InvalidOid for shared relations
1806 * - RS_INVAL_WAL_LEVEL: is logical
1807 *
1808 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1809 */
1810bool
1812 XLogSegNo oldestSegno, Oid dboid,
1813 TransactionId snapshotConflictHorizon)
1814{
1815 XLogRecPtr oldestLSN;
1816 bool invalidated = false;
1817
1818 Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1819 Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1820 Assert(cause != RS_INVAL_NONE);
1821
1822 if (max_replication_slots == 0)
1823 return invalidated;
1824
1825 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1826
1827restart:
1828 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1829 for (int i = 0; i < max_replication_slots; i++)
1830 {
1832
1833 if (!s->in_use)
1834 continue;
1835
1836 if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1837 snapshotConflictHorizon,
1838 &invalidated))
1839 {
1840 /* if the lock was released, start from scratch */
1841 goto restart;
1842 }
1843 }
1844 LWLockRelease(ReplicationSlotControlLock);
1845
1846 /*
1847 * If any slots have been invalidated, recalculate the resource limits.
1848 */
1849 if (invalidated)
1850 {
1853 }
1854
1855 return invalidated;
1856}
1857
1858/*
1859 * Flush all replication slots to disk.
1860 *
1861 * It is convenient to flush dirty replication slots at the time of checkpoint.
1862 * Additionally, in case of a shutdown checkpoint, we also identify the slots
1863 * for which the confirmed_flush LSN has been updated since the last time it
1864 * was saved and flush them.
1865 */
1866void
1868{
1869 int i;
1870
1871 elog(DEBUG1, "performing replication slot checkpoint");
1872
1873 /*
1874 * Prevent any slot from being created/dropped while we're active. As we
1875 * explicitly do *not* want to block iterating over replication_slots or
1876 * acquiring a slot we cannot take the control lock - but that's OK,
1877 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1878 * enough to guarantee that nobody can change the in_use bits on us.
1879 */
1880 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1881
1882 for (i = 0; i < max_replication_slots; i++)
1883 {
1885 char path[MAXPGPATH];
1886
1887 if (!s->in_use)
1888 continue;
1889
1890 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1891 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
1892
1893 /*
1894 * Slot's data is not flushed each time the confirmed_flush LSN is
1895 * updated as that could lead to frequent writes. However, we decide
1896 * to force a flush of all logical slot's data at the time of shutdown
1897 * if the confirmed_flush LSN is changed since we last flushed it to
1898 * disk. This helps in avoiding an unnecessary retreat of the
1899 * confirmed_flush LSN after restart.
1900 */
1901 if (is_shutdown && SlotIsLogical(s))
1902 {
1904
1905 if (s->data.invalidated == RS_INVAL_NONE &&
1907 {
1908 s->just_dirtied = true;
1909 s->dirty = true;
1910 }
1912 }
1913
1914 SaveSlotToPath(s, path, LOG);
1915 }
1916 LWLockRelease(ReplicationSlotAllocationLock);
1917}
1918
1919/*
1920 * Load all replication slots from disk into memory at server startup. This
1921 * needs to be run before we start crash recovery.
1922 */
1923void
1925{
1926 DIR *replication_dir;
1927 struct dirent *replication_de;
1928
1929 elog(DEBUG1, "starting up replication slots");
1930
1931 /* restore all slots by iterating over all on-disk entries */
1932 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
1933 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
1934 {
1935 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
1936 PGFileType de_type;
1937
1938 if (strcmp(replication_de->d_name, ".") == 0 ||
1939 strcmp(replication_de->d_name, "..") == 0)
1940 continue;
1941
1942 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1943 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1944
1945 /* we're only creating directories here, skip if it's not our's */
1946 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1947 continue;
1948
1949 /* we crashed while a slot was being setup or deleted, clean up */
1950 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1951 {
1952 if (!rmtree(path, true))
1953 {
1955 (errmsg("could not remove directory \"%s\"",
1956 path)));
1957 continue;
1958 }
1960 continue;
1961 }
1962
1963 /* looks like a slot in a normal state, restore */
1964 RestoreSlotFromDisk(replication_de->d_name);
1965 }
1966 FreeDir(replication_dir);
1967
1968 /* currently no slots exist, we're done. */
1969 if (max_replication_slots <= 0)
1970 return;
1971
1972 /* Now that we have recovered all the data, compute replication xmin */
1975}
1976
1977/* ----
1978 * Manipulation of on-disk state of replication slots
1979 *
1980 * NB: none of the routines below should take any notice whether a slot is the
1981 * current one or not, that's all handled a layer above.
1982 * ----
1983 */
1984static void
1986{
1987 char tmppath[MAXPGPATH];
1988 char path[MAXPGPATH];
1989 struct stat st;
1990
1991 /*
1992 * No need to take out the io_in_progress_lock, nobody else can see this
1993 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1994 * takes out the lock, if we'd take the lock here, we'd deadlock.
1995 */
1996
1997 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1998 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1999
2000 /*
2001 * It's just barely possible that some previous effort to create or drop a
2002 * slot with this name left a temp directory lying around. If that seems
2003 * to be the case, try to remove it. If the rmtree() fails, we'll error
2004 * out at the MakePGDirectory() below, so we don't bother checking
2005 * success.
2006 */
2007 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2008 rmtree(tmppath, true);
2009
2010 /* Create and fsync the temporary slot directory. */
2011 if (MakePGDirectory(tmppath) < 0)
2012 ereport(ERROR,
2014 errmsg("could not create directory \"%s\": %m",
2015 tmppath)));
2016 fsync_fname(tmppath, true);
2017
2018 /* Write the actual state file. */
2019 slot->dirty = true; /* signal that we really need to write */
2020 SaveSlotToPath(slot, tmppath, ERROR);
2021
2022 /* Rename the directory into place. */
2023 if (rename(tmppath, path) != 0)
2024 ereport(ERROR,
2026 errmsg("could not rename file \"%s\" to \"%s\": %m",
2027 tmppath, path)));
2028
2029 /*
2030 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2031 * would persist after an OS crash or not - so, force a restart. The
2032 * restart would try to fsync this again till it works.
2033 */
2035
2036 fsync_fname(path, true);
2038
2040}
2041
2042/*
2043 * Shared functionality between saving and creating a replication slot.
2044 */
2045static void
2046SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2047{
2048 char tmppath[MAXPGPATH];
2049 char path[MAXPGPATH];
2050 int fd;
2052 bool was_dirty;
2053
2054 /* first check whether there's something to write out */
2055 SpinLockAcquire(&slot->mutex);
2056 was_dirty = slot->dirty;
2057 slot->just_dirtied = false;
2058 SpinLockRelease(&slot->mutex);
2059
2060 /* and don't do anything if there's nothing to write */
2061 if (!was_dirty)
2062 return;
2063
2065
2066 /* silence valgrind :( */
2067 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2068
2069 sprintf(tmppath, "%s/state.tmp", dir);
2070 sprintf(path, "%s/state", dir);
2071
2072 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2073 if (fd < 0)
2074 {
2075 /*
2076 * If not an ERROR, then release the lock before returning. In case
2077 * of an ERROR, the error recovery path automatically releases the
2078 * lock, but no harm in explicitly releasing even in that case. Note
2079 * that LWLockRelease() could affect errno.
2080 */
2081 int save_errno = errno;
2082
2084 errno = save_errno;
2085 ereport(elevel,
2087 errmsg("could not create file \"%s\": %m",
2088 tmppath)));
2089 return;
2090 }
2091
2092 cp.magic = SLOT_MAGIC;
2094 cp.version = SLOT_VERSION;
2096
2097 SpinLockAcquire(&slot->mutex);
2098
2099 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2100
2101 SpinLockRelease(&slot->mutex);
2102
2106 FIN_CRC32C(cp.checksum);
2107
2108 errno = 0;
2109 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2110 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2111 {
2112 int save_errno = errno;
2113
2117
2118 /* if write didn't set errno, assume problem is no disk space */
2119 errno = save_errno ? save_errno : ENOSPC;
2120 ereport(elevel,
2122 errmsg("could not write to file \"%s\": %m",
2123 tmppath)));
2124 return;
2125 }
2127
2128 /* fsync the temporary file */
2129 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2130 if (pg_fsync(fd) != 0)
2131 {
2132 int save_errno = errno;
2133
2137 errno = save_errno;
2138 ereport(elevel,
2140 errmsg("could not fsync file \"%s\": %m",
2141 tmppath)));
2142 return;
2143 }
2145
2146 if (CloseTransientFile(fd) != 0)
2147 {
2148 int save_errno = errno;
2149
2151 errno = save_errno;
2152 ereport(elevel,
2154 errmsg("could not close file \"%s\": %m",
2155 tmppath)));
2156 return;
2157 }
2158
2159 /* rename to permanent file, fsync file and directory */
2160 if (rename(tmppath, path) != 0)
2161 {
2162 int save_errno = errno;
2163
2165 errno = save_errno;
2166 ereport(elevel,
2168 errmsg("could not rename file \"%s\" to \"%s\": %m",
2169 tmppath, path)));
2170 return;
2171 }
2172
2173 /*
2174 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2175 */
2177
2178 fsync_fname(path, false);
2179 fsync_fname(dir, true);
2181
2183
2184 /*
2185 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2186 * already and remember the confirmed_flush LSN value.
2187 */
2188 SpinLockAcquire(&slot->mutex);
2189 if (!slot->just_dirtied)
2190 slot->dirty = false;
2192 SpinLockRelease(&slot->mutex);
2193
2195}
2196
2197/*
2198 * Load a single slot from disk into memory.
2199 */
2200static void
2202{
2204 int i;
2205 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2206 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2207 int fd;
2208 bool restored = false;
2209 int readBytes;
2210 pg_crc32c checksum;
2211
2212 /* no need to lock here, no concurrent access allowed yet */
2213
2214 /* delete temp file if it exists */
2215 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2216 sprintf(path, "%s/state.tmp", slotdir);
2217 if (unlink(path) < 0 && errno != ENOENT)
2218 ereport(PANIC,
2220 errmsg("could not remove file \"%s\": %m", path)));
2221
2222 sprintf(path, "%s/state", slotdir);
2223
2224 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2225
2226 /* on some operating systems fsyncing a file requires O_RDWR */
2227 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2228
2229 /*
2230 * We do not need to handle this as we are rename()ing the directory into
2231 * place only after we fsync()ed the state file.
2232 */
2233 if (fd < 0)
2234 ereport(PANIC,
2236 errmsg("could not open file \"%s\": %m", path)));
2237
2238 /*
2239 * Sync state file before we're reading from it. We might have crashed
2240 * while it wasn't synced yet and we shouldn't continue on that basis.
2241 */
2242 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2243 if (pg_fsync(fd) != 0)
2244 ereport(PANIC,
2246 errmsg("could not fsync file \"%s\": %m",
2247 path)));
2249
2250 /* Also sync the parent directory */
2252 fsync_fname(slotdir, true);
2254
2255 /* read part of statefile that's guaranteed to be version independent */
2256 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2257 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2259 if (readBytes != ReplicationSlotOnDiskConstantSize)
2260 {
2261 if (readBytes < 0)
2262 ereport(PANIC,
2264 errmsg("could not read file \"%s\": %m", path)));
2265 else
2266 ereport(PANIC,
2268 errmsg("could not read file \"%s\": read %d of %zu",
2269 path, readBytes,
2271 }
2272
2273 /* verify magic */
2274 if (cp.magic != SLOT_MAGIC)
2275 ereport(PANIC,
2277 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2278 path, cp.magic, SLOT_MAGIC)));
2279
2280 /* verify version */
2281 if (cp.version != SLOT_VERSION)
2282 ereport(PANIC,
2284 errmsg("replication slot file \"%s\" has unsupported version %u",
2285 path, cp.version)));
2286
2287 /* boundary check on length */
2289 ereport(PANIC,
2291 errmsg("replication slot file \"%s\" has corrupted length %u",
2292 path, cp.length)));
2293
2294 /* Now that we know the size, read the entire file */
2295 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2296 readBytes = read(fd,
2297 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2298 cp.length);
2300 if (readBytes != cp.length)
2301 {
2302 if (readBytes < 0)
2303 ereport(PANIC,
2305 errmsg("could not read file \"%s\": %m", path)));
2306 else
2307 ereport(PANIC,
2309 errmsg("could not read file \"%s\": read %d of %zu",
2310 path, readBytes, (Size) cp.length)));
2311 }
2312
2313 if (CloseTransientFile(fd) != 0)
2314 ereport(PANIC,
2316 errmsg("could not close file \"%s\": %m", path)));
2317
2318 /* now verify the CRC */
2319 INIT_CRC32C(checksum);
2320 COMP_CRC32C(checksum,
2323 FIN_CRC32C(checksum);
2324
2325 if (!EQ_CRC32C(checksum, cp.checksum))
2326 ereport(PANIC,
2327 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2328 path, checksum, cp.checksum)));
2329
2330 /*
2331 * If we crashed with an ephemeral slot active, don't restore but delete
2332 * it.
2333 */
2335 {
2336 if (!rmtree(slotdir, true))
2337 {
2339 (errmsg("could not remove directory \"%s\"",
2340 slotdir)));
2341 }
2343 return;
2344 }
2345
2346 /*
2347 * Verify that requirements for the specific slot type are met. That's
2348 * important because if these aren't met we're not guaranteed to retain
2349 * all the necessary resources for the slot.
2350 *
2351 * NB: We have to do so *after* the above checks for ephemeral slots,
2352 * because otherwise a slot that shouldn't exist anymore could prevent
2353 * restarts.
2354 *
2355 * NB: Changing the requirements here also requires adapting
2356 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2357 */
2359 ereport(FATAL,
2360 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2361 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2362 NameStr(cp.slotdata.name)),
2363 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2364 else if (wal_level < WAL_LEVEL_REPLICA)
2365 ereport(FATAL,
2366 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2367 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2368 NameStr(cp.slotdata.name)),
2369 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2370
2371 /* nothing can be active yet, don't lock anything */
2372 for (i = 0; i < max_replication_slots; i++)
2373 {
2374 ReplicationSlot *slot;
2375
2377
2378 if (slot->in_use)
2379 continue;
2380
2381 /* restore the entire set of persistent data */
2382 memcpy(&slot->data, &cp.slotdata,
2384
2385 /* initialize in memory state */
2386 slot->effective_xmin = cp.slotdata.xmin;
2389
2394
2395 slot->in_use = true;
2396 slot->active_pid = 0;
2397
2398 /*
2399 * Set the time since the slot has become inactive after loading the
2400 * slot from the disk into memory. Whoever acquires the slot i.e.
2401 * makes the slot active will reset it.
2402 */
2404
2405 restored = true;
2406 break;
2407 }
2408
2409 if (!restored)
2410 ereport(FATAL,
2411 (errmsg("too many replication slots active before shutdown"),
2412 errhint("Increase \"max_replication_slots\" and try again.")));
2413}
2414
2415/*
2416 * Maps an invalidation reason for a replication slot to
2417 * ReplicationSlotInvalidationCause.
2418 */
2420GetSlotInvalidationCause(const char *invalidation_reason)
2421{
2424 bool found PG_USED_FOR_ASSERTS_ONLY = false;
2425
2426 Assert(invalidation_reason);
2427
2428 for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2429 {
2430 if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
2431 {
2432 found = true;
2433 result = cause;
2434 break;
2435 }
2436 }
2437
2438 Assert(found);
2439 return result;
2440}
2441
2442/*
2443 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2444 *
2445 * The rawname will be parsed, and the result will be saved into *elemlist.
2446 */
2447static bool
2448validate_sync_standby_slots(char *rawname, List **elemlist)
2449{
2450 bool ok;
2451
2452 /* Verify syntax and parse string into a list of identifiers */
2453 ok = SplitIdentifierString(rawname, ',', elemlist);
2454
2455 if (!ok)
2456 {
2457 GUC_check_errdetail("List syntax is invalid.");
2458 }
2459 else if (MyProc)
2460 {
2461 /*
2462 * Check that each specified slot exist and is physical.
2463 *
2464 * Because we need an LWLock, we cannot do this on processes without a
2465 * PGPROC, so we skip it there; but see comments in
2466 * StandbySlotsHaveCaughtup() as to why that's not a problem.
2467 */
2468 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2469
2470 foreach_ptr(char, name, *elemlist)
2471 {
2472 ReplicationSlot *slot;
2473
2474 slot = SearchNamedReplicationSlot(name, false);
2475
2476 if (!slot)
2477 {
2478 GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2479 name);
2480 ok = false;
2481 break;
2482 }
2483
2484 if (!SlotIsPhysical(slot))
2485 {
2486 GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2487 name);
2488 ok = false;
2489 break;
2490 }
2491 }
2492
2493 LWLockRelease(ReplicationSlotControlLock);
2494 }
2495
2496 return ok;
2497}
2498
2499/*
2500 * GUC check_hook for synchronized_standby_slots
2501 */
2502bool
2504{
2505 char *rawname;
2506 char *ptr;
2507 List *elemlist;
2508 int size;
2509 bool ok;
2511
2512 if ((*newval)[0] == '\0')
2513 return true;
2514
2515 /* Need a modifiable copy of the GUC string */
2516 rawname = pstrdup(*newval);
2517
2518 /* Now verify if the specified slots exist and have correct type */
2519 ok = validate_sync_standby_slots(rawname, &elemlist);
2520
2521 if (!ok || elemlist == NIL)
2522 {
2523 pfree(rawname);
2524 list_free(elemlist);
2525 return ok;
2526 }
2527
2528 /* Compute the size required for the SyncStandbySlotsConfigData struct */
2529 size = offsetof(SyncStandbySlotsConfigData, slot_names);
2530 foreach_ptr(char, slot_name, elemlist)
2531 size += strlen(slot_name) + 1;
2532
2533 /* GUC extra value must be guc_malloc'd, not palloc'd */
2535
2536 /* Transform the data into SyncStandbySlotsConfigData */
2537 config->nslotnames = list_length(elemlist);
2538
2539 ptr = config->slot_names;
2540 foreach_ptr(char, slot_name, elemlist)
2541 {
2542 strcpy(ptr, slot_name);
2543 ptr += strlen(slot_name) + 1;
2544 }
2545
2546 *extra = config;
2547
2548 pfree(rawname);
2549 list_free(elemlist);
2550 return true;
2551}
2552
2553/*
2554 * GUC assign_hook for synchronized_standby_slots
2555 */
2556void
2558{
2559 /*
2560 * The standby slots may have changed, so we must recompute the oldest
2561 * LSN.
2562 */
2564
2566}
2567
2568/*
2569 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2570 */
2571bool
2572SlotExistsInSyncStandbySlots(const char *slot_name)
2573{
2574 const char *standby_slot_name;
2575
2576 /* Return false if there is no value in synchronized_standby_slots */
2578 return false;
2579
2580 /*
2581 * XXX: We are not expecting this list to be long so a linear search
2582 * shouldn't hurt but if that turns out not to be true then we can cache
2583 * this information for each WalSender as well.
2584 */
2585 standby_slot_name = synchronized_standby_slots_config->slot_names;
2586 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2587 {
2588 if (strcmp(standby_slot_name, slot_name) == 0)
2589 return true;
2590
2591 standby_slot_name += strlen(standby_slot_name) + 1;
2592 }
2593
2594 return false;
2595}
2596
2597/*
2598 * Return true if the slots specified in synchronized_standby_slots have caught up to
2599 * the given WAL location, false otherwise.
2600 *
2601 * The elevel parameter specifies the error level used for logging messages
2602 * related to slots that do not exist, are invalidated, or are inactive.
2603 */
2604bool
2605StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2606{
2607 const char *name;
2608 int caught_up_slot_num = 0;
2609 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2610
2611 /*
2612 * Don't need to wait for the standbys to catch up if there is no value in
2613 * synchronized_standby_slots.
2614 */
2616 return true;
2617
2618 /*
2619 * Don't need to wait for the standbys to catch up if we are on a standby
2620 * server, since we do not support syncing slots to cascading standbys.
2621 */
2622 if (RecoveryInProgress())
2623 return true;
2624
2625 /*
2626 * Don't need to wait for the standbys to catch up if they are already
2627 * beyond the specified WAL location.
2628 */
2630 ss_oldest_flush_lsn >= wait_for_lsn)
2631 return true;
2632
2633 /*
2634 * To prevent concurrent slot dropping and creation while filtering the
2635 * slots, take the ReplicationSlotControlLock outside of the loop.
2636 */
2637 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2638
2640 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2641 {
2642 XLogRecPtr restart_lsn;
2643 bool invalidated;
2644 bool inactive;
2645 ReplicationSlot *slot;
2646
2647 slot = SearchNamedReplicationSlot(name, false);
2648
2649 /*
2650 * If a slot name provided in synchronized_standby_slots does not
2651 * exist, report a message and exit the loop.
2652 *
2653 * Though validate_sync_standby_slots (the GUC check_hook) tries to
2654 * avoid this, it can nonetheless happen because the user can specify
2655 * a nonexistent slot name before server startup. That function cannot
2656 * validate such a slot during startup, as ReplicationSlotCtl is not
2657 * initialized by then. Also, the user might have dropped one slot.
2658 */
2659 if (!slot)
2660 {
2661 ereport(elevel,
2662 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2663 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2664 name, "synchronized_standby_slots"),
2665 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2666 name),
2667 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2668 name, "synchronized_standby_slots"));
2669 break;
2670 }
2671
2672 /* Same as above: if a slot is not physical, exit the loop. */
2673 if (SlotIsLogical(slot))
2674 {
2675 ereport(elevel,
2676 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2677 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2678 name, "synchronized_standby_slots"),
2679 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2680 name),
2681 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2682 name, "synchronized_standby_slots"));
2683 break;
2684 }
2685
2686 SpinLockAcquire(&slot->mutex);
2687 restart_lsn = slot->data.restart_lsn;
2688 invalidated = slot->data.invalidated != RS_INVAL_NONE;
2689 inactive = slot->active_pid == 0;
2690 SpinLockRelease(&slot->mutex);
2691
2692 if (invalidated)
2693 {
2694 /* Specified physical slot has been invalidated */
2695 ereport(elevel,
2696 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2697 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2698 name, "synchronized_standby_slots"),
2699 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2700 name),
2701 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2702 name, "synchronized_standby_slots"));
2703 break;
2704 }
2705
2706 if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2707 {
2708 /* Log a message if no active_pid for this physical slot */
2709 if (inactive)
2710 ereport(elevel,
2711 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2712 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2713 name, "synchronized_standby_slots"),
2714 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2715 name),
2716 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2717 name, "synchronized_standby_slots"));
2718
2719 /* Continue if the current slot hasn't caught up. */
2720 break;
2721 }
2722
2723 Assert(restart_lsn >= wait_for_lsn);
2724
2725 if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2726 min_restart_lsn > restart_lsn)
2727 min_restart_lsn = restart_lsn;
2728
2729 caught_up_slot_num++;
2730
2731 name += strlen(name) + 1;
2732 }
2733
2734 LWLockRelease(ReplicationSlotControlLock);
2735
2736 /*
2737 * Return false if not all the standbys have caught up to the specified
2738 * WAL location.
2739 */
2740 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2741 return false;
2742
2743 /* The ss_oldest_flush_lsn must not retreat. */
2745 min_restart_lsn >= ss_oldest_flush_lsn);
2746
2747 ss_oldest_flush_lsn = min_restart_lsn;
2748
2749 return true;
2750}
2751
2752/*
2753 * Wait for physical standbys to confirm receiving the given lsn.
2754 *
2755 * Used by logical decoding SQL functions. It waits for physical standbys
2756 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
2757 */
2758void
2760{
2761 /*
2762 * Don't need to wait for the standby to catch up if the current acquired
2763 * slot is not a logical failover slot, or there is no value in
2764 * synchronized_standby_slots.
2765 */
2767 return;
2768
2770
2771 for (;;)
2772 {
2774
2776 {
2777 ConfigReloadPending = false;
2779 }
2780
2781 /* Exit if done waiting for every slot. */
2782 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2783 break;
2784
2785 /*
2786 * Wait for the slots in the synchronized_standby_slots to catch up,
2787 * but use a timeout (1s) so we can also check if the
2788 * synchronized_standby_slots has been changed.
2789 */
2791 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2792 }
2793
2795}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define NameStr(name)
Definition: c.h:700
#define ngettext(s, p, n)
Definition: c.h:1135
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:201
#define Assert(condition)
Definition: c.h:812
#define PG_BINARY
Definition: c.h:1227
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:417
#define pg_unreachable()
Definition: c.h:315
uint32_t uint32
Definition: c.h:485
#define lengthof(array)
Definition: c.h:742
#define MemSet(start, val, len)
Definition: c.h:974
uint32 TransactionId
Definition: c.h:606
size_t Size
Definition: c.h:559
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int errcode_for_file_access(void)
Definition: elog.c:876
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3936
int FreeDir(DIR *dir)
Definition: fd.c:2983
int CloseTransientFile(int fd)
Definition: fd.c:2831
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:755
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2931
int pg_fsync(int fd)
Definition: fd.c:385
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:526
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool IsBinaryUpgrade
Definition: globals.c:120
int MyProcPid
Definition: globals.c:46
bool IsUnderPostmaster
Definition: globals.c:119
Oid MyDatabaseId
Definition: globals.c:93
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void * guc_malloc(int elevel, size_t size)
Definition: guc.c:638
#define newval
#define GUC_check_errdetail
Definition: guc.h:476
GucSource
Definition: guc.h:108
@ PGC_SIGHUP
Definition: guc.h:71
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
int i
Definition: isn.c:72
void list_free(List *list)
Definition: list.c:1546
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1893
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1937
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:707
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:189
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
@ B_STARTUP
Definition: miscadmin.h:363
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:517
BackendType MyBackendType
Definition: miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:736
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static rewind_source * source
Definition: pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3943
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
static pg_noinline void Size size
Definition: slab.c:607
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:497
char * synchronized_standby_slots
Definition: slot.c:148
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition: slot.c:2557
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:125
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1867
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotDropAcquired(void)
Definition: slot.c:896
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1038
void ReplicationSlotReserveWal(void)
Definition: slot.c:1429
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1240
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1811
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:157
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2420
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1298
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:122
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1182
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1077
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2201
#define RS_INVAL_MAX_CAUSES
Definition: slot.c:113
void ReplicationSlotPersist(void)
Definition: slot.c:1055
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2046
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2572
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition: slot.c:2448
void ReplicationSlotSave(void)
Definition: slot.c:1020
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:1985
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:128
void CheckSlotPermissions(void)
Definition: slot.c:1412
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:513
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition: slot.c:2503
void ReplicationSlotsShmemInit(void)
Definition: slot.c:189
const char *const SlotInvalidationCauses[]
Definition: slot.c:105
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:807
void ReplicationSlotRelease(void)
Definition: slot.c:652
int max_replication_slots
Definition: slot.c:141
StaticAssertDecl(lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
#define SLOT_VERSION
Definition: slot.c:132
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:2759
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2605
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1133
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
void ReplicationSlotInitialize(void)
Definition: slot.c:224
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:913
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1571
void StartupReplicationSlots(void)
Definition: slot.c:1924
void CheckSlotRequirements(void)
Definition: slot.c:1390
#define SLOT_MAGIC
Definition: slot.c:131
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
Definition: slot.c:1505
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:151
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:119
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:171
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:233
ReplicationSlotPersistency
Definition: slot.h:37
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
#define SlotIsPhysical(slot)
Definition: slot.h:216
#define PG_REPLSLOT_DIR
Definition: slot.h:21
ReplicationSlotInvalidationCause
Definition: slot.h:51
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:54
@ RS_INVAL_HORIZON
Definition: slot.h:56
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:58
@ RS_INVAL_NONE
Definition: slot.h:52
#define SlotIsLogical(slot)
Definition: slot.h:217
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1649
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1281
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:94
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:179
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56
Definition: dirent.c:26
Definition: pg_list.h:54
uint8 statusFlags
Definition: proc.h:242
int pgxactoff
Definition: proc.h:184
uint8 * statusFlags
Definition: proc.h:399
ReplicationSlot replication_slots[1]
Definition: slot.h:228
uint32 version
Definition: slot.c:73
ReplicationSlotPersistentData slotdata
Definition: slot.c:81
pg_crc32c checksum
Definition: slot.c:70
TransactionId xmin
Definition: slot.h:85
TransactionId catalog_xmin
Definition: slot.h:93
XLogRecPtr restart_lsn
Definition: slot.h:96
XLogRecPtr confirmed_flush
Definition: slot.h:107
ReplicationSlotPersistency persistency
Definition: slot.h:77
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:99
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:197
TransactionId effective_catalog_xmin
Definition: slot.h:178
slock_t mutex
Definition: slot.h:154
XLogRecPtr candidate_restart_valid
Definition: slot.h:198
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:206
pid_t active_pid
Definition: slot.h:160
bool in_use
Definition: slot.h:157
TransactionId effective_xmin
Definition: slot.h:177
bool just_dirtied
Definition: slot.h:163
XLogRecPtr candidate_restart_lsn
Definition: slot.h:199
LWLock io_in_progress_lock
Definition: slot.h:184
ConditionVariable active_cv
Definition: slot.h:187
TransactionId candidate_catalog_xmin
Definition: slot.h:196
bool dirty
Definition: slot.h:164
ReplicationSlotPersistentData data
Definition: slot.h:181
TimestampTz inactive_since
Definition: slot.h:213
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:99
ConditionVariable wal_confirm_rcv_cv
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:96
Definition: c.h:695
unsigned short st_mode
Definition: win32_port.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3432
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
const char * name
bool am_walsender
Definition: walsender.c:115
bool log_replication_commands
Definition: walsender.c:125
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define stat
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:325
#define kill(pid, sig)
Definition: win32_port.h:503
bool RecoveryInProgress(void)
Definition: xlog.c:6334
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3758
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6437
int wal_level
Definition: xlog.c:131
int wal_segment_size
Definition: xlog.c:143
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2688
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9435
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2802
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)