PostgreSQL Source Code  git master
slot.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  * Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster. Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups). The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot. The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "access/xlogrecovery.h"
45 #include "common/file_utils.h"
46 #include "common/string.h"
47 #include "miscadmin.h"
48 #include "pgstat.h"
49 #include "replication/slot.h"
50 #include "storage/fd.h"
51 #include "storage/ipc.h"
52 #include "storage/proc.h"
53 #include "storage/procarray.h"
54 #include "utils/builtins.h"
55 
56 /*
57  * Replication slot on-disk data structure.
58  */
59 typedef struct ReplicationSlotOnDisk
60 {
61  /* first part of this struct needs to be version independent */
62 
63  /* data not covered by checksum */
66 
67  /* data covered by checksum */
70 
71  /*
72  * The actual data in the slot that follows can differ based on the above
73  * 'version'.
74  */
75 
78 
79 /* size of version independent data */
80 #define ReplicationSlotOnDiskConstantSize \
81  offsetof(ReplicationSlotOnDisk, slotdata)
82 /* size of the part of the slot not covered by the checksum */
83 #define ReplicationSlotOnDiskNotChecksummedSize \
84  offsetof(ReplicationSlotOnDisk, version)
85 /* size of the part covered by the checksum */
86 #define ReplicationSlotOnDiskChecksummedSize \
87  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
88 /* size of the slot data that is version dependent */
89 #define ReplicationSlotOnDiskV2Size \
90  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
91 
92 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
93 #define SLOT_VERSION 3 /* version for new files */
94 
95 /* Control array for replication slot management */
97 
98 /* My backend's replication slot in the shared memory array */
100 
101 /* GUC variable */
102 int max_replication_slots = 10; /* the maximum number of replication
103  * slots */
104 
105 static void ReplicationSlotShmemExit(int code, Datum arg);
106 static void ReplicationSlotDropAcquired(void);
107 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
108 
109 /* internal persistency functions */
110 static void RestoreSlotFromDisk(const char *name);
111 static void CreateSlotOnDisk(ReplicationSlot *slot);
112 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
113 
114 /*
115  * Report shared-memory space needed by ReplicationSlotsShmemInit.
116  */
117 Size
119 {
120  Size size = 0;
121 
122  if (max_replication_slots == 0)
123  return size;
124 
125  size = offsetof(ReplicationSlotCtlData, replication_slots);
126  size = add_size(size,
128 
129  return size;
130 }
131 
132 /*
133  * Allocate and initialize shared memory for replication slots.
134  */
135 void
137 {
138  bool found;
139 
140  if (max_replication_slots == 0)
141  return;
142 
144  ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
145  &found);
146 
147  if (!found)
148  {
149  int i;
150 
151  /* First time through, so initialize */
153 
154  for (i = 0; i < max_replication_slots; i++)
155  {
157 
158  /* everything else is zeroed by the memset above */
159  SpinLockInit(&slot->mutex);
163  }
164  }
165 }
166 
167 /*
168  * Register the callback for replication slot cleanup and releasing.
169  */
170 void
172 {
174 }
175 
176 /*
177  * Release and cleanup replication slots.
178  */
179 static void
181 {
182  /* Make sure active replication slots are released */
183  if (MyReplicationSlot != NULL)
185 
186  /* Also cleanup all the temporary slots. */
188 }
189 
190 /*
191  * Check whether the passed slot name is valid and report errors at elevel.
192  *
193  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
194  * the name to be used as a directory name on every supported OS.
195  *
196  * Returns whether the directory name is valid or not if elevel < ERROR.
197  */
198 bool
199 ReplicationSlotValidateName(const char *name, int elevel)
200 {
201  const char *cp;
202 
203  if (strlen(name) == 0)
204  {
205  ereport(elevel,
206  (errcode(ERRCODE_INVALID_NAME),
207  errmsg("replication slot name \"%s\" is too short",
208  name)));
209  return false;
210  }
211 
212  if (strlen(name) >= NAMEDATALEN)
213  {
214  ereport(elevel,
215  (errcode(ERRCODE_NAME_TOO_LONG),
216  errmsg("replication slot name \"%s\" is too long",
217  name)));
218  return false;
219  }
220 
221  for (cp = name; *cp; cp++)
222  {
223  if (!((*cp >= 'a' && *cp <= 'z')
224  || (*cp >= '0' && *cp <= '9')
225  || (*cp == '_')))
226  {
227  ereport(elevel,
228  (errcode(ERRCODE_INVALID_NAME),
229  errmsg("replication slot name \"%s\" contains invalid character",
230  name),
231  errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
232  return false;
233  }
234  }
235  return true;
236 }
237 
238 /*
239  * Create a new replication slot and mark it as used by this backend.
240  *
241  * name: Name of the slot
242  * db_specific: logical decoding is db specific; if the slot is going to
243  * be used for that pass true, otherwise false.
244  * two_phase: Allows decoding of prepared transactions. We allow this option
245  * to be enabled only at the slot creation time. If we allow this option
246  * to be changed during decoding then it is quite possible that we skip
247  * prepare first time because this option was not enabled. Now next time
248  * during getting changes, if the two_phase option is enabled it can skip
249  * prepare because by that time start decoding point has been moved. So the
250  * user will only get commit prepared.
251  */
252 void
253 ReplicationSlotCreate(const char *name, bool db_specific,
254  ReplicationSlotPersistency persistency, bool two_phase)
255 {
256  ReplicationSlot *slot = NULL;
257  int i;
258 
259  Assert(MyReplicationSlot == NULL);
260 
262 
263  /*
264  * If some other backend ran this code concurrently with us, we'd likely
265  * both allocate the same slot, and that would be bad. We'd also be at
266  * risk of missing a name collision. Also, we don't want to try to create
267  * a new slot while somebody's busy cleaning up an old one, because we
268  * might both be monkeying with the same directory.
269  */
270  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
271 
272  /*
273  * Check for name collision, and identify an allocatable slot. We need to
274  * hold ReplicationSlotControlLock in shared mode for this, so that nobody
275  * else can change the in_use flags while we're looking at them.
276  */
277  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
278  for (i = 0; i < max_replication_slots; i++)
279  {
281 
282  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
283  ereport(ERROR,
285  errmsg("replication slot \"%s\" already exists", name)));
286  if (!s->in_use && slot == NULL)
287  slot = s;
288  }
289  LWLockRelease(ReplicationSlotControlLock);
290 
291  /* If all slots are in use, we're out of luck. */
292  if (slot == NULL)
293  ereport(ERROR,
294  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
295  errmsg("all replication slots are in use"),
296  errhint("Free one or increase max_replication_slots.")));
297 
298  /*
299  * Since this slot is not in use, nobody should be looking at any part of
300  * it other than the in_use field unless they're trying to allocate it.
301  * And since we hold ReplicationSlotAllocationLock, nobody except us can
302  * be doing that. So it's safe to initialize the slot.
303  */
304  Assert(!slot->in_use);
305  Assert(slot->active_pid == 0);
306 
307  /* first initialize persistent data */
308  memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
309  namestrcpy(&slot->data.name, name);
310  slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
311  slot->data.persistency = persistency;
312  slot->data.two_phase = two_phase;
314 
315  /* and then data only present in shared memory */
316  slot->just_dirtied = false;
317  slot->dirty = false;
324 
325  /*
326  * Create the slot on disk. We haven't actually marked the slot allocated
327  * yet, so no special cleanup is required if this errors out.
328  */
329  CreateSlotOnDisk(slot);
330 
331  /*
332  * We need to briefly prevent any other backend from iterating over the
333  * slots while we flip the in_use flag. We also need to set the active
334  * flag while holding the ControlLock as otherwise a concurrent
335  * ReplicationSlotAcquire() could acquire the slot as well.
336  */
337  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
338 
339  slot->in_use = true;
340 
341  /* We can now mark the slot active, and that makes it our slot. */
342  SpinLockAcquire(&slot->mutex);
343  Assert(slot->active_pid == 0);
344  slot->active_pid = MyProcPid;
345  SpinLockRelease(&slot->mutex);
346  MyReplicationSlot = slot;
347 
348  LWLockRelease(ReplicationSlotControlLock);
349 
350  /*
351  * Create statistics entry for the new logical slot. We don't collect any
352  * stats for physical slots, so no need to create an entry for the same.
353  * See ReplicationSlotDropPtr for why we need to do this before releasing
354  * ReplicationSlotAllocationLock.
355  */
356  if (SlotIsLogical(slot))
358 
359  /*
360  * Now that the slot has been marked as in_use and active, it's safe to
361  * let somebody else try to allocate a slot.
362  */
363  LWLockRelease(ReplicationSlotAllocationLock);
364 
365  /* Let everybody know we've modified this slot */
367 }
368 
369 /*
370  * Search for the named replication slot.
371  *
372  * Return the replication slot if found, otherwise NULL.
373  */
375 SearchNamedReplicationSlot(const char *name, bool need_lock)
376 {
377  int i;
378  ReplicationSlot *slot = NULL;
379 
380  if (need_lock)
381  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
382 
383  for (i = 0; i < max_replication_slots; i++)
384  {
386 
387  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
388  {
389  slot = s;
390  break;
391  }
392  }
393 
394  if (need_lock)
395  LWLockRelease(ReplicationSlotControlLock);
396 
397  return slot;
398 }
399 
400 /*
401  * Return the index of the replication slot in
402  * ReplicationSlotCtl->replication_slots.
403  *
404  * This is mainly useful to have an efficient key for storing replication slot
405  * stats.
406  */
407 int
409 {
411  slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
412 
413  return slot - ReplicationSlotCtl->replication_slots;
414 }
415 
416 /*
417  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
418  * the slot's name and true is returned.
419  *
420  * This likely is only useful for pgstat_replslot.c during shutdown, in other
421  * cases there are obvious TOCTOU issues.
422  */
423 bool
425 {
426  ReplicationSlot *slot;
427  bool found;
428 
430 
431  /*
432  * Ensure that the slot cannot be dropped while we copy the name. Don't
433  * need the spinlock as the name of an existing slot cannot change.
434  */
435  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
436  found = slot->in_use;
437  if (slot->in_use)
438  namestrcpy(name, NameStr(slot->data.name));
439  LWLockRelease(ReplicationSlotControlLock);
440 
441  return found;
442 }
443 
444 /*
445  * Find a previously created slot and mark it as used by this process.
446  *
447  * An error is raised if nowait is true and the slot is currently in use. If
448  * nowait is false, we sleep until the slot is released by the owning process.
449  */
450 void
451 ReplicationSlotAcquire(const char *name, bool nowait)
452 {
453  ReplicationSlot *s;
454  int active_pid;
455 
456  Assert(name != NULL);
457 
458 retry:
459  Assert(MyReplicationSlot == NULL);
460 
461  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
462 
463  /*
464  * Search for the slot with the specified name if the slot to acquire is
465  * not given. If the slot is not found, we either return -1 or error out.
466  */
467  s = SearchNamedReplicationSlot(name, false);
468  if (s == NULL || !s->in_use)
469  {
470  LWLockRelease(ReplicationSlotControlLock);
471 
472  ereport(ERROR,
473  (errcode(ERRCODE_UNDEFINED_OBJECT),
474  errmsg("replication slot \"%s\" does not exist",
475  name)));
476  }
477 
478  /*
479  * This is the slot we want; check if it's active under some other
480  * process. In single user mode, we don't need this check.
481  */
482  if (IsUnderPostmaster)
483  {
484  /*
485  * Get ready to sleep on the slot in case it is active. (We may end
486  * up not sleeping, but we don't want to do this while holding the
487  * spinlock.)
488  */
489  if (!nowait)
491 
492  SpinLockAcquire(&s->mutex);
493  if (s->active_pid == 0)
494  s->active_pid = MyProcPid;
495  active_pid = s->active_pid;
496  SpinLockRelease(&s->mutex);
497  }
498  else
499  active_pid = MyProcPid;
500  LWLockRelease(ReplicationSlotControlLock);
501 
502  /*
503  * If we found the slot but it's already active in another process, we
504  * wait until the owning process signals us that it's been released, or
505  * error out.
506  */
507  if (active_pid != MyProcPid)
508  {
509  if (!nowait)
510  {
511  /* Wait here until we get signaled, and then restart */
515  goto retry;
516  }
517 
518  ereport(ERROR,
519  (errcode(ERRCODE_OBJECT_IN_USE),
520  errmsg("replication slot \"%s\" is active for PID %d",
521  NameStr(s->data.name), active_pid)));
522  }
523  else if (!nowait)
524  ConditionVariableCancelSleep(); /* no sleep needed after all */
525 
526  /* Let everybody know we've modified this slot */
528 
529  /* We made this slot active, so it's ours now. */
530  MyReplicationSlot = s;
531 
532  /*
533  * The call to pgstat_acquire_replslot() protects against stats for a
534  * different slot, from before a restart or such, being present during
535  * pgstat_report_replslot().
536  */
537  if (SlotIsLogical(s))
539 }
540 
541 /*
542  * Release the replication slot that this backend considers to own.
543  *
544  * This or another backend can re-acquire the slot later.
545  * Resources this slot requires will be preserved.
546  */
547 void
549 {
551 
552  Assert(slot != NULL && slot->active_pid != 0);
553 
554  if (slot->data.persistency == RS_EPHEMERAL)
555  {
556  /*
557  * Delete the slot. There is no !PANIC case where this is allowed to
558  * fail, all that may happen is an incomplete cleanup of the on-disk
559  * data.
560  */
562  }
563 
564  /*
565  * If slot needed to temporarily restrain both data and catalog xmin to
566  * create the catalog snapshot, remove that temporary constraint.
567  * Snapshots can only be exported while the initial snapshot is still
568  * acquired.
569  */
570  if (!TransactionIdIsValid(slot->data.xmin) &&
572  {
573  SpinLockAcquire(&slot->mutex);
575  SpinLockRelease(&slot->mutex);
577  }
578 
579  if (slot->data.persistency == RS_PERSISTENT)
580  {
581  /*
582  * Mark persistent slot inactive. We're not freeing it, just
583  * disconnecting, but wake up others that may be waiting for it.
584  */
585  SpinLockAcquire(&slot->mutex);
586  slot->active_pid = 0;
587  SpinLockRelease(&slot->mutex);
589  }
590 
591  MyReplicationSlot = NULL;
592 
593  /* might not have been set when we've been a plain slot */
594  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
597  LWLockRelease(ProcArrayLock);
598 }
599 
600 /*
601  * Cleanup all temporary slots created in current session.
602  */
603 void
605 {
606  int i;
607 
608  Assert(MyReplicationSlot == NULL);
609 
610 restart:
611  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
612  for (i = 0; i < max_replication_slots; i++)
613  {
615 
616  if (!s->in_use)
617  continue;
618 
619  SpinLockAcquire(&s->mutex);
620  if (s->active_pid == MyProcPid)
621  {
623  SpinLockRelease(&s->mutex);
624  LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
625 
627 
629  goto restart;
630  }
631  else
632  SpinLockRelease(&s->mutex);
633  }
634 
635  LWLockRelease(ReplicationSlotControlLock);
636 }
637 
638 /*
639  * Permanently drop replication slot identified by the passed in name.
640  */
641 void
642 ReplicationSlotDrop(const char *name, bool nowait)
643 {
644  Assert(MyReplicationSlot == NULL);
645 
646  ReplicationSlotAcquire(name, nowait);
647 
649 }
650 
651 /*
652  * Permanently drop the currently acquired replication slot.
653  */
654 static void
656 {
658 
659  Assert(MyReplicationSlot != NULL);
660 
661  /* slot isn't acquired anymore */
662  MyReplicationSlot = NULL;
663 
665 }
666 
667 /*
668  * Permanently drop the replication slot which will be released by the point
669  * this function returns.
670  */
671 static void
673 {
674  char path[MAXPGPATH];
675  char tmppath[MAXPGPATH];
676 
677  /*
678  * If some other backend ran this code concurrently with us, we might try
679  * to delete a slot with a certain name while someone else was trying to
680  * create a slot with the same name.
681  */
682  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
683 
684  /* Generate pathnames. */
685  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
686  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
687 
688  /*
689  * Rename the slot directory on disk, so that we'll no longer recognize
690  * this as a valid slot. Note that if this fails, we've got to mark the
691  * slot inactive before bailing out. If we're dropping an ephemeral or a
692  * temporary slot, we better never fail hard as the caller won't expect
693  * the slot to survive and this might get called during error handling.
694  */
695  if (rename(path, tmppath) == 0)
696  {
697  /*
698  * We need to fsync() the directory we just renamed and its parent to
699  * make sure that our changes are on disk in a crash-safe fashion. If
700  * fsync() fails, we can't be sure whether the changes are on disk or
701  * not. For now, we handle that by panicking;
702  * StartupReplicationSlots() will try to straighten it out after
703  * restart.
704  */
706  fsync_fname(tmppath, true);
707  fsync_fname("pg_replslot", true);
709  }
710  else
711  {
712  bool fail_softly = slot->data.persistency != RS_PERSISTENT;
713 
714  SpinLockAcquire(&slot->mutex);
715  slot->active_pid = 0;
716  SpinLockRelease(&slot->mutex);
717 
718  /* wake up anyone waiting on this slot */
720 
721  ereport(fail_softly ? WARNING : ERROR,
723  errmsg("could not rename file \"%s\" to \"%s\": %m",
724  path, tmppath)));
725  }
726 
727  /*
728  * The slot is definitely gone. Lock out concurrent scans of the array
729  * long enough to kill it. It's OK to clear the active PID here without
730  * grabbing the mutex because nobody else can be scanning the array here,
731  * and nobody can be attached to this slot and thus access it without
732  * scanning the array.
733  *
734  * Also wake up processes waiting for it.
735  */
736  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
737  slot->active_pid = 0;
738  slot->in_use = false;
739  LWLockRelease(ReplicationSlotControlLock);
741 
742  /*
743  * Slot is dead and doesn't prevent resource removal anymore, recompute
744  * limits.
745  */
748 
749  /*
750  * If removing the directory fails, the worst thing that will happen is
751  * that the user won't be able to create a new slot with the same name
752  * until the next server restart. We warn about it, but that's all.
753  */
754  if (!rmtree(tmppath, true))
756  (errmsg("could not remove directory \"%s\"", tmppath)));
757 
758  /*
759  * Drop the statistics entry for the replication slot. Do this while
760  * holding ReplicationSlotAllocationLock so that we don't drop a
761  * statistics entry for another slot with the same name just created in
762  * another session.
763  */
764  if (SlotIsLogical(slot))
765  pgstat_drop_replslot(slot);
766 
767  /*
768  * We release this at the very end, so that nobody starts trying to create
769  * a slot while we're still cleaning up the detritus of the old one.
770  */
771  LWLockRelease(ReplicationSlotAllocationLock);
772 }
773 
774 /*
775  * Serialize the currently acquired slot's state from memory to disk, thereby
776  * guaranteeing the current state will survive a crash.
777  */
778 void
780 {
781  char path[MAXPGPATH];
782 
783  Assert(MyReplicationSlot != NULL);
784 
785  sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
787 }
788 
789 /*
790  * Signal that it would be useful if the currently acquired slot would be
791  * flushed out to disk.
792  *
793  * Note that the actual flush to disk can be delayed for a long time, if
794  * required for correctness explicitly do a ReplicationSlotSave().
795  */
796 void
798 {
800 
801  Assert(MyReplicationSlot != NULL);
802 
803  SpinLockAcquire(&slot->mutex);
805  MyReplicationSlot->dirty = true;
806  SpinLockRelease(&slot->mutex);
807 }
808 
809 /*
810  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
811  * guaranteeing it will be there after an eventual crash.
812  */
813 void
815 {
817 
818  Assert(slot != NULL);
820 
821  SpinLockAcquire(&slot->mutex);
823  SpinLockRelease(&slot->mutex);
824 
827 }
828 
829 /*
830  * Compute the oldest xmin across all slots and store it in the ProcArray.
831  *
832  * If already_locked is true, ProcArrayLock has already been acquired
833  * exclusively.
834  */
835 void
837 {
838  int i;
840  TransactionId agg_catalog_xmin = InvalidTransactionId;
841 
842  Assert(ReplicationSlotCtl != NULL);
843 
844  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
845 
846  for (i = 0; i < max_replication_slots; i++)
847  {
849  TransactionId effective_xmin;
850  TransactionId effective_catalog_xmin;
851  bool invalidated;
852 
853  if (!s->in_use)
854  continue;
855 
856  SpinLockAcquire(&s->mutex);
857  effective_xmin = s->effective_xmin;
858  effective_catalog_xmin = s->effective_catalog_xmin;
859  invalidated = s->data.invalidated != RS_INVAL_NONE;
860  SpinLockRelease(&s->mutex);
861 
862  /* invalidated slots need not apply */
863  if (invalidated)
864  continue;
865 
866  /* check the data xmin */
867  if (TransactionIdIsValid(effective_xmin) &&
868  (!TransactionIdIsValid(agg_xmin) ||
869  TransactionIdPrecedes(effective_xmin, agg_xmin)))
870  agg_xmin = effective_xmin;
871 
872  /* check the catalog xmin */
873  if (TransactionIdIsValid(effective_catalog_xmin) &&
874  (!TransactionIdIsValid(agg_catalog_xmin) ||
875  TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
876  agg_catalog_xmin = effective_catalog_xmin;
877  }
878 
879  LWLockRelease(ReplicationSlotControlLock);
880 
881  ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
882 }
883 
884 /*
885  * Compute the oldest restart LSN across all slots and inform xlog module.
886  *
887  * Note: while max_slot_wal_keep_size is theoretically relevant for this
888  * purpose, we don't try to account for that, because this module doesn't
889  * know what to compare against.
890  */
891 void
893 {
894  int i;
895  XLogRecPtr min_required = InvalidXLogRecPtr;
896 
897  Assert(ReplicationSlotCtl != NULL);
898 
899  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
900  for (i = 0; i < max_replication_slots; i++)
901  {
903  XLogRecPtr restart_lsn;
904  bool invalidated;
905 
906  if (!s->in_use)
907  continue;
908 
909  SpinLockAcquire(&s->mutex);
910  restart_lsn = s->data.restart_lsn;
911  invalidated = s->data.invalidated != RS_INVAL_NONE;
912  SpinLockRelease(&s->mutex);
913 
914  /* invalidated slots need not apply */
915  if (invalidated)
916  continue;
917 
918  if (restart_lsn != InvalidXLogRecPtr &&
919  (min_required == InvalidXLogRecPtr ||
920  restart_lsn < min_required))
921  min_required = restart_lsn;
922  }
923  LWLockRelease(ReplicationSlotControlLock);
924 
925  XLogSetReplicationSlotMinimumLSN(min_required);
926 }
927 
928 /*
929  * Compute the oldest WAL LSN required by *logical* decoding slots..
930  *
931  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
932  * slots exist.
933  *
934  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
935  * ignores physical replication slots.
936  *
937  * The results aren't required frequently, so we don't maintain a precomputed
938  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
939  */
942 {
943  XLogRecPtr result = InvalidXLogRecPtr;
944  int i;
945 
946  if (max_replication_slots <= 0)
947  return InvalidXLogRecPtr;
948 
949  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
950 
951  for (i = 0; i < max_replication_slots; i++)
952  {
953  ReplicationSlot *s;
954  XLogRecPtr restart_lsn;
955  bool invalidated;
956 
958 
959  /* cannot change while ReplicationSlotCtlLock is held */
960  if (!s->in_use)
961  continue;
962 
963  /* we're only interested in logical slots */
964  if (!SlotIsLogical(s))
965  continue;
966 
967  /* read once, it's ok if it increases while we're checking */
968  SpinLockAcquire(&s->mutex);
969  restart_lsn = s->data.restart_lsn;
970  invalidated = s->data.invalidated != RS_INVAL_NONE;
971  SpinLockRelease(&s->mutex);
972 
973  /* invalidated slots need not apply */
974  if (invalidated)
975  continue;
976 
977  if (restart_lsn == InvalidXLogRecPtr)
978  continue;
979 
980  if (result == InvalidXLogRecPtr ||
981  restart_lsn < result)
982  result = restart_lsn;
983  }
984 
985  LWLockRelease(ReplicationSlotControlLock);
986 
987  return result;
988 }
989 
990 /*
991  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
992  * passed database oid.
993  *
994  * Returns true if there are any slots referencing the database. *nslots will
995  * be set to the absolute number of slots in the database, *nactive to ones
996  * currently active.
997  */
998 bool
999 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1000 {
1001  int i;
1002 
1003  *nslots = *nactive = 0;
1004 
1005  if (max_replication_slots <= 0)
1006  return false;
1007 
1008  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1009  for (i = 0; i < max_replication_slots; i++)
1010  {
1011  ReplicationSlot *s;
1012 
1014 
1015  /* cannot change while ReplicationSlotCtlLock is held */
1016  if (!s->in_use)
1017  continue;
1018 
1019  /* only logical slots are database specific, skip */
1020  if (!SlotIsLogical(s))
1021  continue;
1022 
1023  /* not our database, skip */
1024  if (s->data.database != dboid)
1025  continue;
1026 
1027  /* NB: intentionally counting invalidated slots */
1028 
1029  /* count slots with spinlock held */
1030  SpinLockAcquire(&s->mutex);
1031  (*nslots)++;
1032  if (s->active_pid != 0)
1033  (*nactive)++;
1034  SpinLockRelease(&s->mutex);
1035  }
1036  LWLockRelease(ReplicationSlotControlLock);
1037 
1038  if (*nslots > 0)
1039  return true;
1040  return false;
1041 }
1042 
1043 /*
1044  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1045  * passed database oid. The caller should hold an exclusive lock on the
1046  * pg_database oid for the database to prevent creation of new slots on the db
1047  * or replay from existing slots.
1048  *
1049  * Another session that concurrently acquires an existing slot on the target DB
1050  * (most likely to drop it) may cause this function to ERROR. If that happens
1051  * it may have dropped some but not all slots.
1052  *
1053  * This routine isn't as efficient as it could be - but we don't drop
1054  * databases often, especially databases with lots of slots.
1055  */
1056 void
1058 {
1059  int i;
1060 
1061  if (max_replication_slots <= 0)
1062  return;
1063 
1064 restart:
1065  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1066  for (i = 0; i < max_replication_slots; i++)
1067  {
1068  ReplicationSlot *s;
1069  char *slotname;
1070  int active_pid;
1071 
1073 
1074  /* cannot change while ReplicationSlotCtlLock is held */
1075  if (!s->in_use)
1076  continue;
1077 
1078  /* only logical slots are database specific, skip */
1079  if (!SlotIsLogical(s))
1080  continue;
1081 
1082  /* not our database, skip */
1083  if (s->data.database != dboid)
1084  continue;
1085 
1086  /* NB: intentionally including invalidated slots */
1087 
1088  /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1089  SpinLockAcquire(&s->mutex);
1090  /* can't change while ReplicationSlotControlLock is held */
1091  slotname = NameStr(s->data.name);
1092  active_pid = s->active_pid;
1093  if (active_pid == 0)
1094  {
1095  MyReplicationSlot = s;
1096  s->active_pid = MyProcPid;
1097  }
1098  SpinLockRelease(&s->mutex);
1099 
1100  /*
1101  * Even though we hold an exclusive lock on the database object a
1102  * logical slot for that DB can still be active, e.g. if it's
1103  * concurrently being dropped by a backend connected to another DB.
1104  *
1105  * That's fairly unlikely in practice, so we'll just bail out.
1106  */
1107  if (active_pid)
1108  ereport(ERROR,
1109  (errcode(ERRCODE_OBJECT_IN_USE),
1110  errmsg("replication slot \"%s\" is active for PID %d",
1111  slotname, active_pid)));
1112 
1113  /*
1114  * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1115  * holding ReplicationSlotControlLock over filesystem operations,
1116  * release ReplicationSlotControlLock and use
1117  * ReplicationSlotDropAcquired.
1118  *
1119  * As that means the set of slots could change, restart scan from the
1120  * beginning each time we release the lock.
1121  */
1122  LWLockRelease(ReplicationSlotControlLock);
1124  goto restart;
1125  }
1126  LWLockRelease(ReplicationSlotControlLock);
1127 }
1128 
1129 
1130 /*
1131  * Check whether the server's configuration supports using replication
1132  * slots.
1133  */
1134 void
1136 {
1137  /*
1138  * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1139  * needs the same check.
1140  */
1141 
1142  if (max_replication_slots == 0)
1143  ereport(ERROR,
1144  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1145  errmsg("replication slots can only be used if max_replication_slots > 0")));
1146 
1148  ereport(ERROR,
1149  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150  errmsg("replication slots can only be used if wal_level >= replica")));
1151 }
1152 
1153 /*
1154  * Check whether the user has privilege to use replication slots.
1155  */
1156 void
1158 {
1159  if (!has_rolreplication(GetUserId()))
1160  ereport(ERROR,
1161  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1162  errmsg("permission denied to use replication slots"),
1163  errdetail("Only roles with the %s attribute may use replication slots.",
1164  "REPLICATION")));
1165 }
1166 
1167 /*
1168  * Reserve WAL for the currently active slot.
1169  *
1170  * Compute and set restart_lsn in a manner that's appropriate for the type of
1171  * the slot and concurrency safe.
1172  */
1173 void
1175 {
1177 
1178  Assert(slot != NULL);
1180 
1181  /*
1182  * The replication slot mechanism is used to prevent removal of required
1183  * WAL. As there is no interlock between this routine and checkpoints, WAL
1184  * segments could concurrently be removed when a now stale return value of
1185  * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1186  * this happens we'll just retry.
1187  */
1188  while (true)
1189  {
1190  XLogSegNo segno;
1191  XLogRecPtr restart_lsn;
1192 
1193  /*
1194  * For logical slots log a standby snapshot and start logical decoding
1195  * at exactly that position. That allows the slot to start up more
1196  * quickly. But on a standby we cannot do WAL writes, so just use the
1197  * replay pointer; effectively, an attempt to create a logical slot on
1198  * standby will cause it to wait for an xl_running_xact record to be
1199  * logged independently on the primary, so that a snapshot can be
1200  * built using the record.
1201  *
1202  * None of this is needed (or indeed helpful) for physical slots as
1203  * they'll start replay at the last logged checkpoint anyway. Instead
1204  * return the location of the last redo LSN. While that slightly
1205  * increases the chance that we have to retry, it's where a base
1206  * backup has to start replay at.
1207  */
1208  if (SlotIsPhysical(slot))
1209  restart_lsn = GetRedoRecPtr();
1210  else if (RecoveryInProgress())
1211  restart_lsn = GetXLogReplayRecPtr(NULL);
1212  else
1213  restart_lsn = GetXLogInsertRecPtr();
1214 
1215  SpinLockAcquire(&slot->mutex);
1216  slot->data.restart_lsn = restart_lsn;
1217  SpinLockRelease(&slot->mutex);
1218 
1219  /* prevent WAL removal as fast as possible */
1221 
1222  /*
1223  * If all required WAL is still there, great, otherwise retry. The
1224  * slot should prevent further removal of WAL, unless there's a
1225  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1226  * the new restart_lsn above, so normally we should never need to loop
1227  * more than twice.
1228  */
1230  if (XLogGetLastRemovedSegno() < segno)
1231  break;
1232  }
1233 
1234  if (!RecoveryInProgress() && SlotIsLogical(slot))
1235  {
1236  XLogRecPtr flushptr;
1237 
1238  /* make sure we have enough information to start */
1239  flushptr = LogStandbySnapshot();
1240 
1241  /* and make sure it's fsynced to disk */
1242  XLogFlush(flushptr);
1243  }
1244 }
1245 
1246 /*
1247  * Report that replication slot needs to be invalidated
1248  */
1249 static void
1251  bool terminating,
1252  int pid,
1253  NameData slotname,
1254  XLogRecPtr restart_lsn,
1255  XLogRecPtr oldestLSN,
1256  TransactionId snapshotConflictHorizon)
1257 {
1258  StringInfoData err_detail;
1259  bool hint = false;
1260 
1261  initStringInfo(&err_detail);
1262 
1263  switch (cause)
1264  {
1265  case RS_INVAL_WAL_REMOVED:
1266  hint = true;
1267  appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
1268  LSN_FORMAT_ARGS(restart_lsn),
1269  (unsigned long long) (oldestLSN - restart_lsn));
1270  break;
1271  case RS_INVAL_HORIZON:
1272  appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1273  snapshotConflictHorizon);
1274  break;
1275 
1276  case RS_INVAL_WAL_LEVEL:
1277  appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
1278  break;
1279  case RS_INVAL_NONE:
1280  pg_unreachable();
1281  }
1282 
1283  ereport(LOG,
1284  terminating ?
1285  errmsg("terminating process %d to release replication slot \"%s\"",
1286  pid, NameStr(slotname)) :
1287  errmsg("invalidating obsolete replication slot \"%s\"",
1288  NameStr(slotname)),
1289  errdetail_internal("%s", err_detail.data),
1290  hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
1291 
1292  pfree(err_detail.data);
1293 }
1294 
1295 /*
1296  * Helper for InvalidateObsoleteReplicationSlots
1297  *
1298  * Acquires the given slot and mark it invalid, if necessary and possible.
1299  *
1300  * Returns whether ReplicationSlotControlLock was released in the interim (and
1301  * in that case we're not holding the lock at return, otherwise we are).
1302  *
1303  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1304  *
1305  * This is inherently racy, because we release the LWLock
1306  * for syscalls, so caller must restart if we return true.
1307  */
1308 static bool
1310  ReplicationSlot *s,
1311  XLogRecPtr oldestLSN,
1312  Oid dboid, TransactionId snapshotConflictHorizon,
1313  bool *invalidated)
1314 {
1315  int last_signaled_pid = 0;
1316  bool released_lock = false;
1317 
1318  for (;;)
1319  {
1320  XLogRecPtr restart_lsn;
1321  NameData slotname;
1322  int active_pid = 0;
1324 
1325  Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1326 
1327  if (!s->in_use)
1328  {
1329  if (released_lock)
1330  LWLockRelease(ReplicationSlotControlLock);
1331  break;
1332  }
1333 
1334  /*
1335  * Check if the slot needs to be invalidated. If it needs to be
1336  * invalidated, and is not currently acquired, acquire it and mark it
1337  * as having been invalidated. We do this with the spinlock held to
1338  * avoid race conditions -- for example the restart_lsn could move
1339  * forward, or the slot could be dropped.
1340  */
1341  SpinLockAcquire(&s->mutex);
1342 
1343  restart_lsn = s->data.restart_lsn;
1344 
1345  /*
1346  * If the slot is already invalid or is a non conflicting slot, we
1347  * don't need to do anything.
1348  */
1349  if (s->data.invalidated == RS_INVAL_NONE)
1350  {
1351  switch (cause)
1352  {
1353  case RS_INVAL_WAL_REMOVED:
1354  if (s->data.restart_lsn != InvalidXLogRecPtr &&
1355  s->data.restart_lsn < oldestLSN)
1356  conflict = cause;
1357  break;
1358  case RS_INVAL_HORIZON:
1359  if (!SlotIsLogical(s))
1360  break;
1361  /* invalid DB oid signals a shared relation */
1362  if (dboid != InvalidOid && dboid != s->data.database)
1363  break;
1366  snapshotConflictHorizon))
1367  conflict = cause;
1370  snapshotConflictHorizon))
1371  conflict = cause;
1372  break;
1373  case RS_INVAL_WAL_LEVEL:
1374  if (SlotIsLogical(s))
1375  conflict = cause;
1376  break;
1377  case RS_INVAL_NONE:
1378  pg_unreachable();
1379  }
1380  }
1381 
1382  /* if there's no conflict, we're done */
1383  if (conflict == RS_INVAL_NONE)
1384  {
1385  SpinLockRelease(&s->mutex);
1386  if (released_lock)
1387  LWLockRelease(ReplicationSlotControlLock);
1388  break;
1389  }
1390 
1391  slotname = s->data.name;
1392  active_pid = s->active_pid;
1393 
1394  /*
1395  * If the slot can be acquired, do so and mark it invalidated
1396  * immediately. Otherwise we'll signal the owning process, below, and
1397  * retry.
1398  */
1399  if (active_pid == 0)
1400  {
1401  MyReplicationSlot = s;
1402  s->active_pid = MyProcPid;
1403  s->data.invalidated = conflict;
1404 
1405  /*
1406  * XXX: We should consider not overwriting restart_lsn and instead
1407  * just rely on .invalidated.
1408  */
1409  if (conflict == RS_INVAL_WAL_REMOVED)
1411 
1412  /* Let caller know */
1413  *invalidated = true;
1414  }
1415 
1416  SpinLockRelease(&s->mutex);
1417 
1418  if (active_pid != 0)
1419  {
1420  /*
1421  * Prepare the sleep on the slot's condition variable before
1422  * releasing the lock, to close a possible race condition if the
1423  * slot is released before the sleep below.
1424  */
1426 
1427  LWLockRelease(ReplicationSlotControlLock);
1428  released_lock = true;
1429 
1430  /*
1431  * Signal to terminate the process that owns the slot, if we
1432  * haven't already signalled it. (Avoidance of repeated
1433  * signalling is the only reason for there to be a loop in this
1434  * routine; otherwise we could rely on caller's restart loop.)
1435  *
1436  * There is the race condition that other process may own the slot
1437  * after its current owner process is terminated and before this
1438  * process owns it. To handle that, we signal only if the PID of
1439  * the owning process has changed from the previous time. (This
1440  * logic assumes that the same PID is not reused very quickly.)
1441  */
1442  if (last_signaled_pid != active_pid)
1443  {
1444  ReportSlotInvalidation(conflict, true, active_pid,
1445  slotname, restart_lsn,
1446  oldestLSN, snapshotConflictHorizon);
1447 
1448  if (MyBackendType == B_STARTUP)
1449  (void) SendProcSignal(active_pid,
1452  else
1453  (void) kill(active_pid, SIGTERM);
1454 
1455  last_signaled_pid = active_pid;
1456  }
1457 
1458  /* Wait until the slot is released. */
1461 
1462  /*
1463  * Re-acquire lock and start over; we expect to invalidate the
1464  * slot next time (unless another process acquires the slot in the
1465  * meantime).
1466  */
1467  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1468  continue;
1469  }
1470  else
1471  {
1472  /*
1473  * We hold the slot now and have already invalidated it; flush it
1474  * to ensure that state persists.
1475  *
1476  * Don't want to hold ReplicationSlotControlLock across file
1477  * system operations, so release it now but be sure to tell caller
1478  * to restart from scratch.
1479  */
1480  LWLockRelease(ReplicationSlotControlLock);
1481  released_lock = true;
1482 
1483  /* Make sure the invalidated state persists across server restart */
1488 
1489  ReportSlotInvalidation(conflict, false, active_pid,
1490  slotname, restart_lsn,
1491  oldestLSN, snapshotConflictHorizon);
1492 
1493  /* done with this slot for now */
1494  break;
1495  }
1496  }
1497 
1498  Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1499 
1500  return released_lock;
1501 }
1502 
1503 /*
1504  * Invalidate slots that require resources about to be removed.
1505  *
1506  * Returns true when any slot have got invalidated.
1507  *
1508  * Whether a slot needs to be invalidated depends on the cause. A slot is
1509  * removed if it:
1510  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1511  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1512  * db; dboid may be InvalidOid for shared relations
1513  * - RS_INVAL_WAL_LEVEL: is logical
1514  *
1515  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1516  */
1517 bool
1519  XLogSegNo oldestSegno, Oid dboid,
1520  TransactionId snapshotConflictHorizon)
1521 {
1522  XLogRecPtr oldestLSN;
1523  bool invalidated = false;
1524 
1525  Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1526  Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1527  Assert(cause != RS_INVAL_NONE);
1528 
1529  if (max_replication_slots == 0)
1530  return invalidated;
1531 
1532  XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1533 
1534 restart:
1535  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1536  for (int i = 0; i < max_replication_slots; i++)
1537  {
1539 
1540  if (!s->in_use)
1541  continue;
1542 
1543  if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1544  snapshotConflictHorizon,
1545  &invalidated))
1546  {
1547  /* if the lock was released, start from scratch */
1548  goto restart;
1549  }
1550  }
1551  LWLockRelease(ReplicationSlotControlLock);
1552 
1553  /*
1554  * If any slots have been invalidated, recalculate the resource limits.
1555  */
1556  if (invalidated)
1557  {
1560  }
1561 
1562  return invalidated;
1563 }
1564 
1565 /*
1566  * Flush all replication slots to disk.
1567  *
1568  * This needn't actually be part of a checkpoint, but it's a convenient
1569  * location.
1570  */
1571 void
1573 {
1574  int i;
1575 
1576  elog(DEBUG1, "performing replication slot checkpoint");
1577 
1578  /*
1579  * Prevent any slot from being created/dropped while we're active. As we
1580  * explicitly do *not* want to block iterating over replication_slots or
1581  * acquiring a slot we cannot take the control lock - but that's OK,
1582  * because holding ReplicationSlotAllocationLock is strictly stronger, and
1583  * enough to guarantee that nobody can change the in_use bits on us.
1584  */
1585  LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1586 
1587  for (i = 0; i < max_replication_slots; i++)
1588  {
1590  char path[MAXPGPATH];
1591 
1592  if (!s->in_use)
1593  continue;
1594 
1595  /* save the slot to disk, locking is handled in SaveSlotToPath() */
1596  sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1597  SaveSlotToPath(s, path, LOG);
1598  }
1599  LWLockRelease(ReplicationSlotAllocationLock);
1600 }
1601 
1602 /*
1603  * Load all replication slots from disk into memory at server startup. This
1604  * needs to be run before we start crash recovery.
1605  */
1606 void
1608 {
1609  DIR *replication_dir;
1610  struct dirent *replication_de;
1611 
1612  elog(DEBUG1, "starting up replication slots");
1613 
1614  /* restore all slots by iterating over all on-disk entries */
1615  replication_dir = AllocateDir("pg_replslot");
1616  while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1617  {
1618  char path[MAXPGPATH + 12];
1619  PGFileType de_type;
1620 
1621  if (strcmp(replication_de->d_name, ".") == 0 ||
1622  strcmp(replication_de->d_name, "..") == 0)
1623  continue;
1624 
1625  snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1626  de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1627 
1628  /* we're only creating directories here, skip if it's not our's */
1629  if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1630  continue;
1631 
1632  /* we crashed while a slot was being setup or deleted, clean up */
1633  if (pg_str_endswith(replication_de->d_name, ".tmp"))
1634  {
1635  if (!rmtree(path, true))
1636  {
1637  ereport(WARNING,
1638  (errmsg("could not remove directory \"%s\"",
1639  path)));
1640  continue;
1641  }
1642  fsync_fname("pg_replslot", true);
1643  continue;
1644  }
1645 
1646  /* looks like a slot in a normal state, restore */
1647  RestoreSlotFromDisk(replication_de->d_name);
1648  }
1649  FreeDir(replication_dir);
1650 
1651  /* currently no slots exist, we're done. */
1652  if (max_replication_slots <= 0)
1653  return;
1654 
1655  /* Now that we have recovered all the data, compute replication xmin */
1658 }
1659 
1660 /* ----
1661  * Manipulation of on-disk state of replication slots
1662  *
1663  * NB: none of the routines below should take any notice whether a slot is the
1664  * current one or not, that's all handled a layer above.
1665  * ----
1666  */
1667 static void
1669 {
1670  char tmppath[MAXPGPATH];
1671  char path[MAXPGPATH];
1672  struct stat st;
1673 
1674  /*
1675  * No need to take out the io_in_progress_lock, nobody else can see this
1676  * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1677  * takes out the lock, if we'd take the lock here, we'd deadlock.
1678  */
1679 
1680  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1681  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1682 
1683  /*
1684  * It's just barely possible that some previous effort to create or drop a
1685  * slot with this name left a temp directory lying around. If that seems
1686  * to be the case, try to remove it. If the rmtree() fails, we'll error
1687  * out at the MakePGDirectory() below, so we don't bother checking
1688  * success.
1689  */
1690  if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1691  rmtree(tmppath, true);
1692 
1693  /* Create and fsync the temporary slot directory. */
1694  if (MakePGDirectory(tmppath) < 0)
1695  ereport(ERROR,
1697  errmsg("could not create directory \"%s\": %m",
1698  tmppath)));
1699  fsync_fname(tmppath, true);
1700 
1701  /* Write the actual state file. */
1702  slot->dirty = true; /* signal that we really need to write */
1703  SaveSlotToPath(slot, tmppath, ERROR);
1704 
1705  /* Rename the directory into place. */
1706  if (rename(tmppath, path) != 0)
1707  ereport(ERROR,
1709  errmsg("could not rename file \"%s\" to \"%s\": %m",
1710  tmppath, path)));
1711 
1712  /*
1713  * If we'd now fail - really unlikely - we wouldn't know whether this slot
1714  * would persist after an OS crash or not - so, force a restart. The
1715  * restart would try to fsync this again till it works.
1716  */
1718 
1719  fsync_fname(path, true);
1720  fsync_fname("pg_replslot", true);
1721 
1722  END_CRIT_SECTION();
1723 }
1724 
1725 /*
1726  * Shared functionality between saving and creating a replication slot.
1727  */
1728 static void
1729 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1730 {
1731  char tmppath[MAXPGPATH];
1732  char path[MAXPGPATH];
1733  int fd;
1735  bool was_dirty;
1736 
1737  /* first check whether there's something to write out */
1738  SpinLockAcquire(&slot->mutex);
1739  was_dirty = slot->dirty;
1740  slot->just_dirtied = false;
1741  SpinLockRelease(&slot->mutex);
1742 
1743  /* and don't do anything if there's nothing to write */
1744  if (!was_dirty)
1745  return;
1746 
1748 
1749  /* silence valgrind :( */
1750  memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1751 
1752  sprintf(tmppath, "%s/state.tmp", dir);
1753  sprintf(path, "%s/state", dir);
1754 
1755  fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1756  if (fd < 0)
1757  {
1758  /*
1759  * If not an ERROR, then release the lock before returning. In case
1760  * of an ERROR, the error recovery path automatically releases the
1761  * lock, but no harm in explicitly releasing even in that case. Note
1762  * that LWLockRelease() could affect errno.
1763  */
1764  int save_errno = errno;
1765 
1767  errno = save_errno;
1768  ereport(elevel,
1770  errmsg("could not create file \"%s\": %m",
1771  tmppath)));
1772  return;
1773  }
1774 
1775  cp.magic = SLOT_MAGIC;
1776  INIT_CRC32C(cp.checksum);
1777  cp.version = SLOT_VERSION;
1779 
1780  SpinLockAcquire(&slot->mutex);
1781 
1782  memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1783 
1784  SpinLockRelease(&slot->mutex);
1785 
1786  COMP_CRC32C(cp.checksum,
1787  (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
1789  FIN_CRC32C(cp.checksum);
1790 
1791  errno = 0;
1793  if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1794  {
1795  int save_errno = errno;
1796 
1800 
1801  /* if write didn't set errno, assume problem is no disk space */
1802  errno = save_errno ? save_errno : ENOSPC;
1803  ereport(elevel,
1805  errmsg("could not write to file \"%s\": %m",
1806  tmppath)));
1807  return;
1808  }
1810 
1811  /* fsync the temporary file */
1813  if (pg_fsync(fd) != 0)
1814  {
1815  int save_errno = errno;
1816 
1820  errno = save_errno;
1821  ereport(elevel,
1823  errmsg("could not fsync file \"%s\": %m",
1824  tmppath)));
1825  return;
1826  }
1828 
1829  if (CloseTransientFile(fd) != 0)
1830  {
1831  int save_errno = errno;
1832 
1834  errno = save_errno;
1835  ereport(elevel,
1837  errmsg("could not close file \"%s\": %m",
1838  tmppath)));
1839  return;
1840  }
1841 
1842  /* rename to permanent file, fsync file and directory */
1843  if (rename(tmppath, path) != 0)
1844  {
1845  int save_errno = errno;
1846 
1848  errno = save_errno;
1849  ereport(elevel,
1851  errmsg("could not rename file \"%s\" to \"%s\": %m",
1852  tmppath, path)));
1853  return;
1854  }
1855 
1856  /*
1857  * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1858  */
1860 
1861  fsync_fname(path, false);
1862  fsync_fname(dir, true);
1863  fsync_fname("pg_replslot", true);
1864 
1865  END_CRIT_SECTION();
1866 
1867  /*
1868  * Successfully wrote, unset dirty bit, unless somebody dirtied again
1869  * already.
1870  */
1871  SpinLockAcquire(&slot->mutex);
1872  if (!slot->just_dirtied)
1873  slot->dirty = false;
1874  SpinLockRelease(&slot->mutex);
1875 
1877 }
1878 
1879 /*
1880  * Load a single slot from disk into memory.
1881  */
1882 static void
1884 {
1886  int i;
1887  char slotdir[MAXPGPATH + 12];
1888  char path[MAXPGPATH + 22];
1889  int fd;
1890  bool restored = false;
1891  int readBytes;
1892  pg_crc32c checksum;
1893 
1894  /* no need to lock here, no concurrent access allowed yet */
1895 
1896  /* delete temp file if it exists */
1897  sprintf(slotdir, "pg_replslot/%s", name);
1898  sprintf(path, "%s/state.tmp", slotdir);
1899  if (unlink(path) < 0 && errno != ENOENT)
1900  ereport(PANIC,
1902  errmsg("could not remove file \"%s\": %m", path)));
1903 
1904  sprintf(path, "%s/state", slotdir);
1905 
1906  elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1907 
1908  /* on some operating systems fsyncing a file requires O_RDWR */
1909  fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1910 
1911  /*
1912  * We do not need to handle this as we are rename()ing the directory into
1913  * place only after we fsync()ed the state file.
1914  */
1915  if (fd < 0)
1916  ereport(PANIC,
1918  errmsg("could not open file \"%s\": %m", path)));
1919 
1920  /*
1921  * Sync state file before we're reading from it. We might have crashed
1922  * while it wasn't synced yet and we shouldn't continue on that basis.
1923  */
1925  if (pg_fsync(fd) != 0)
1926  ereport(PANIC,
1928  errmsg("could not fsync file \"%s\": %m",
1929  path)));
1931 
1932  /* Also sync the parent directory */
1934  fsync_fname(slotdir, true);
1935  END_CRIT_SECTION();
1936 
1937  /* read part of statefile that's guaranteed to be version independent */
1939  readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1941  if (readBytes != ReplicationSlotOnDiskConstantSize)
1942  {
1943  if (readBytes < 0)
1944  ereport(PANIC,
1946  errmsg("could not read file \"%s\": %m", path)));
1947  else
1948  ereport(PANIC,
1950  errmsg("could not read file \"%s\": read %d of %zu",
1951  path, readBytes,
1953  }
1954 
1955  /* verify magic */
1956  if (cp.magic != SLOT_MAGIC)
1957  ereport(PANIC,
1959  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1960  path, cp.magic, SLOT_MAGIC)));
1961 
1962  /* verify version */
1963  if (cp.version != SLOT_VERSION)
1964  ereport(PANIC,
1966  errmsg("replication slot file \"%s\" has unsupported version %u",
1967  path, cp.version)));
1968 
1969  /* boundary check on length */
1971  ereport(PANIC,
1973  errmsg("replication slot file \"%s\" has corrupted length %u",
1974  path, cp.length)));
1975 
1976  /* Now that we know the size, read the entire file */
1978  readBytes = read(fd,
1979  (char *) &cp + ReplicationSlotOnDiskConstantSize,
1980  cp.length);
1982  if (readBytes != cp.length)
1983  {
1984  if (readBytes < 0)
1985  ereport(PANIC,
1987  errmsg("could not read file \"%s\": %m", path)));
1988  else
1989  ereport(PANIC,
1991  errmsg("could not read file \"%s\": read %d of %zu",
1992  path, readBytes, (Size) cp.length)));
1993  }
1994 
1995  if (CloseTransientFile(fd) != 0)
1996  ereport(PANIC,
1998  errmsg("could not close file \"%s\": %m", path)));
1999 
2000  /* now verify the CRC */
2001  INIT_CRC32C(checksum);
2002  COMP_CRC32C(checksum,
2005  FIN_CRC32C(checksum);
2006 
2007  if (!EQ_CRC32C(checksum, cp.checksum))
2008  ereport(PANIC,
2009  (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2010  path, checksum, cp.checksum)));
2011 
2012  /*
2013  * If we crashed with an ephemeral slot active, don't restore but delete
2014  * it.
2015  */
2017  {
2018  if (!rmtree(slotdir, true))
2019  {
2020  ereport(WARNING,
2021  (errmsg("could not remove directory \"%s\"",
2022  slotdir)));
2023  }
2024  fsync_fname("pg_replslot", true);
2025  return;
2026  }
2027 
2028  /*
2029  * Verify that requirements for the specific slot type are met. That's
2030  * important because if these aren't met we're not guaranteed to retain
2031  * all the necessary resources for the slot.
2032  *
2033  * NB: We have to do so *after* the above checks for ephemeral slots,
2034  * because otherwise a slot that shouldn't exist anymore could prevent
2035  * restarts.
2036  *
2037  * NB: Changing the requirements here also requires adapting
2038  * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2039  */
2041  ereport(FATAL,
2042  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2043  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2044  NameStr(cp.slotdata.name)),
2045  errhint("Change wal_level to be logical or higher.")));
2046  else if (wal_level < WAL_LEVEL_REPLICA)
2047  ereport(FATAL,
2048  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2049  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2050  NameStr(cp.slotdata.name)),
2051  errhint("Change wal_level to be replica or higher.")));
2052 
2053  /* nothing can be active yet, don't lock anything */
2054  for (i = 0; i < max_replication_slots; i++)
2055  {
2056  ReplicationSlot *slot;
2057 
2059 
2060  if (slot->in_use)
2061  continue;
2062 
2063  /* restore the entire set of persistent data */
2064  memcpy(&slot->data, &cp.slotdata,
2066 
2067  /* initialize in memory state */
2068  slot->effective_xmin = cp.slotdata.xmin;
2070 
2075 
2076  slot->in_use = true;
2077  slot->active_pid = 0;
2078 
2079  restored = true;
2080  break;
2081  }
2082 
2083  if (!restored)
2084  ereport(FATAL,
2085  (errmsg("too many replication slots active before shutdown"),
2086  errhint("Increase max_replication_slots and try again.")));
2087 }
#define InvalidBackendId
Definition: backendid.h:23
#define NameStr(name)
Definition: c.h:730
unsigned int uint32
Definition: c.h:490
#define PG_BINARY
Definition: c.h:1278
#define pg_unreachable()
Definition: c.h:280
#define MemSet(start, val, len)
Definition: c.h:1004
uint32 TransactionId
Definition: c.h:636
size_t Size
Definition: c.h:589
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const char * name
Definition: encode.c:571
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2806
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3810
int FreeDir(DIR *dir)
Definition: fd.c:2858
int CloseTransientFile(int fd)
Definition: fd.c:2706
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:667
int pg_fsync(int fd)
Definition: fd.c:361
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2530
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2740
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:406
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
int MyProcPid
Definition: globals.c:44
bool IsUnderPostmaster
Definition: globals.c:113
Oid MyDatabaseId
Definition: globals.c:89
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1919
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1963
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:191
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void pfree(void *pointer)
Definition: mcxt.c:1456
#define START_CRIT_SECTION()
Definition: miscadmin.h:148
@ B_STARTUP
Definition: miscadmin.h:338
#define END_CRIT_SECTION()
Definition: miscadmin.h:150
Oid GetUserId(void)
Definition: miscinit.c:510
BackendType MyBackendType
Definition: miscinit.c:63
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:707
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
static bool two_phase
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_IN_LOGICAL_DECODING
Definition: proc.h:60
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3876
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:45
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:375
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:408
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:86
void ReplicationSlotCleanup(void)
Definition: slot.c:604
void ReplicationSlotMarkDirty(void)
Definition: slot.c:797
void ReplicationSlotReserveWal(void)
Definition: slot.c:1174
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:999
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:451
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1518
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1057
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:83
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:941
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:836
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:1883
void ReplicationSlotPersist(void)
Definition: slot.c:814
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:1729
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:642
void ReplicationSlotSave(void)
Definition: slot.c:779
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:1668
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:89
void CheckSlotPermissions(void)
Definition: slot.c:1157
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:424
void ReplicationSlotsShmemInit(void)
Definition: slot.c:136
void ReplicationSlotRelease(void)
Definition: slot.c:548
int max_replication_slots
Definition: slot.c:102
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:96
#define SLOT_VERSION
Definition: slot.c:93
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void CheckPointReplicationSlots(void)
Definition: slot.c:1572
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:892
void ReplicationSlotInitialize(void)
Definition: slot.c:171
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:672
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1309
void StartupReplicationSlots(void)
Definition: slot.c:1607
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:253
void CheckSlotRequirements(void)
Definition: slot.c:1135
#define SLOT_MAGIC
Definition: slot.c:92
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
Definition: slot.c:1250
static void ReplicationSlotDropAcquired(void)
Definition: slot.c:655
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:80
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:118
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:199
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:180
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsPhysical(slot)
Definition: slot.h:183
ReplicationSlotInvalidationCause
Definition: slot.h:45
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:48
@ RS_INVAL_HORIZON
Definition: slot.h:50
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:52
@ RS_INVAL_NONE
Definition: slot.h:46
#define SlotIsLogical(slot)
Definition: slot.h:184
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1278
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:32
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: dirent.c:26
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:377
ReplicationSlot replication_slots[1]
Definition: slot.h:195
uint32 version
Definition: slot.c:68
ReplicationSlotPersistentData slotdata
Definition: slot.c:76
pg_crc32c checksum
Definition: slot.c:65
TransactionId xmin
Definition: slot.h:77
TransactionId catalog_xmin
Definition: slot.h:85
XLogRecPtr restart_lsn
Definition: slot.h:88
ReplicationSlotPersistency persistency
Definition: slot.h:69
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:91
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:178
TransactionId effective_catalog_xmin
Definition: slot.h:159
slock_t mutex
Definition: slot.h:135
XLogRecPtr candidate_restart_valid
Definition: slot.h:179
pid_t active_pid
Definition: slot.h:141
bool in_use
Definition: slot.h:138
TransactionId effective_xmin
Definition: slot.h:158
bool just_dirtied
Definition: slot.h:144
XLogRecPtr candidate_restart_lsn
Definition: slot.h:180
LWLock io_in_progress_lock
Definition: slot.h:165
ConditionVariable active_cv
Definition: slot.h:168
TransactionId candidate_catalog_xmin
Definition: slot.h:177
bool dirty
Definition: slot.h:145
ReplicationSlotPersistentData data
Definition: slot.h:162
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:95
Definition: c.h:725
unsigned short st_mode
Definition: win32_port.h:276
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
@ WAIT_EVENT_REPLICATION_SLOT_READ
Definition: wait_event.h:206
@ WAIT_EVENT_REPLICATION_SLOT_WRITE
Definition: wait_event.h:209
@ WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC
Definition: wait_event.h:207
@ WAIT_EVENT_REPLICATION_SLOT_SYNC
Definition: wait_event.h:208
@ WAIT_EVENT_REPLICATION_SLOT_DROP
Definition: wait_event.h:128
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:271
static void pgstat_report_wait_end(void)
Definition: wait_event.h:287
#define stat
Definition: win32_port.h:292
#define S_ISDIR(m)
Definition: win32_port.h:333
#define kill(pid, sig)
Definition: win32_port.h:495
bool RecoveryInProgress(void)
Definition: xlog.c:5921
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3469
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6024
int wal_level
Definition: xlog.c:134
int wal_segment_size
Definition: xlog.c:146
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2399
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:8896
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2513
@ WAL_LEVEL_REPLICA
Definition: xlog.h:70
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:71
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)