PostgreSQL Source Code  git master
slot.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  * Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster. Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups). The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot. The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "access/xlogrecovery.h"
45 #include "common/file_utils.h"
46 #include "common/string.h"
47 #include "miscadmin.h"
48 #include "pgstat.h"
49 #include "replication/slot.h"
50 #include "storage/fd.h"
51 #include "storage/ipc.h"
52 #include "storage/proc.h"
53 #include "storage/procarray.h"
54 #include "utils/builtins.h"
55 
56 /*
57  * Replication slot on-disk data structure.
58  */
59 typedef struct ReplicationSlotOnDisk
60 {
61  /* first part of this struct needs to be version independent */
62 
63  /* data not covered by checksum */
66 
67  /* data covered by checksum */
70 
71  /*
72  * The actual data in the slot that follows can differ based on the above
73  * 'version'.
74  */
75 
78 
79 /* size of version independent data */
80 #define ReplicationSlotOnDiskConstantSize \
81  offsetof(ReplicationSlotOnDisk, slotdata)
82 /* size of the part of the slot not covered by the checksum */
83 #define ReplicationSlotOnDiskNotChecksummedSize \
84  offsetof(ReplicationSlotOnDisk, version)
85 /* size of the part covered by the checksum */
86 #define ReplicationSlotOnDiskChecksummedSize \
87  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
88 /* size of the slot data that is version dependent */
89 #define ReplicationSlotOnDiskV2Size \
90  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
91 
92 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
93 #define SLOT_VERSION 3 /* version for new files */
94 
95 /* Control array for replication slot management */
97 
98 /* My backend's replication slot in the shared memory array */
100 
101 /* GUC variable */
102 int max_replication_slots = 10; /* the maximum number of replication
103  * slots */
104 
105 static void ReplicationSlotShmemExit(int code, Datum arg);
106 static void ReplicationSlotDropAcquired(void);
107 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
108 
109 /* internal persistency functions */
110 static void RestoreSlotFromDisk(const char *name);
111 static void CreateSlotOnDisk(ReplicationSlot *slot);
112 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
113 
114 /*
115  * Report shared-memory space needed by ReplicationSlotsShmemInit.
116  */
117 Size
119 {
120  Size size = 0;
121 
122  if (max_replication_slots == 0)
123  return size;
124 
125  size = offsetof(ReplicationSlotCtlData, replication_slots);
126  size = add_size(size,
128 
129  return size;
130 }
131 
132 /*
133  * Allocate and initialize shared memory for replication slots.
134  */
135 void
137 {
138  bool found;
139 
140  if (max_replication_slots == 0)
141  return;
142 
144  ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
145  &found);
146 
147  if (!found)
148  {
149  int i;
150 
151  /* First time through, so initialize */
153 
154  for (i = 0; i < max_replication_slots; i++)
155  {
157 
158  /* everything else is zeroed by the memset above */
159  SpinLockInit(&slot->mutex);
163  }
164  }
165 }
166 
167 /*
168  * Register the callback for replication slot cleanup and releasing.
169  */
170 void
172 {
174 }
175 
176 /*
177  * Release and cleanup replication slots.
178  */
179 static void
181 {
182  /* Make sure active replication slots are released */
183  if (MyReplicationSlot != NULL)
185 
186  /* Also cleanup all the temporary slots. */
188 }
189 
190 /*
191  * Check whether the passed slot name is valid and report errors at elevel.
192  *
193  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
194  * the name to be used as a directory name on every supported OS.
195  *
196  * Returns whether the directory name is valid or not if elevel < ERROR.
197  */
198 bool
199 ReplicationSlotValidateName(const char *name, int elevel)
200 {
201  const char *cp;
202 
203  if (strlen(name) == 0)
204  {
205  ereport(elevel,
206  (errcode(ERRCODE_INVALID_NAME),
207  errmsg("replication slot name \"%s\" is too short",
208  name)));
209  return false;
210  }
211 
212  if (strlen(name) >= NAMEDATALEN)
213  {
214  ereport(elevel,
215  (errcode(ERRCODE_NAME_TOO_LONG),
216  errmsg("replication slot name \"%s\" is too long",
217  name)));
218  return false;
219  }
220 
221  for (cp = name; *cp; cp++)
222  {
223  if (!((*cp >= 'a' && *cp <= 'z')
224  || (*cp >= '0' && *cp <= '9')
225  || (*cp == '_')))
226  {
227  ereport(elevel,
228  (errcode(ERRCODE_INVALID_NAME),
229  errmsg("replication slot name \"%s\" contains invalid character",
230  name),
231  errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
232  return false;
233  }
234  }
235  return true;
236 }
237 
238 /*
239  * Create a new replication slot and mark it as used by this backend.
240  *
241  * name: Name of the slot
242  * db_specific: logical decoding is db specific; if the slot is going to
243  * be used for that pass true, otherwise false.
244  * two_phase: Allows decoding of prepared transactions. We allow this option
245  * to be enabled only at the slot creation time. If we allow this option
246  * to be changed during decoding then it is quite possible that we skip
247  * prepare first time because this option was not enabled. Now next time
248  * during getting changes, if the two_phase option is enabled it can skip
249  * prepare because by that time start decoding point has been moved. So the
250  * user will only get commit prepared.
251  */
252 void
253 ReplicationSlotCreate(const char *name, bool db_specific,
254  ReplicationSlotPersistency persistency, bool two_phase)
255 {
256  ReplicationSlot *slot = NULL;
257  int i;
258 
259  Assert(MyReplicationSlot == NULL);
260 
262 
263  /*
264  * If some other backend ran this code concurrently with us, we'd likely
265  * both allocate the same slot, and that would be bad. We'd also be at
266  * risk of missing a name collision. Also, we don't want to try to create
267  * a new slot while somebody's busy cleaning up an old one, because we
268  * might both be monkeying with the same directory.
269  */
270  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
271 
272  /*
273  * Check for name collision, and identify an allocatable slot. We need to
274  * hold ReplicationSlotControlLock in shared mode for this, so that nobody
275  * else can change the in_use flags while we're looking at them.
276  */
277  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
278  for (i = 0; i < max_replication_slots; i++)
279  {
281 
282  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
283  ereport(ERROR,
285  errmsg("replication slot \"%s\" already exists", name)));
286  if (!s->in_use && slot == NULL)
287  slot = s;
288  }
289  LWLockRelease(ReplicationSlotControlLock);
290 
291  /* If all slots are in use, we're out of luck. */
292  if (slot == NULL)
293  ereport(ERROR,
294  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
295  errmsg("all replication slots are in use"),
296  errhint("Free one or increase max_replication_slots.")));
297 
298  /*
299  * Since this slot is not in use, nobody should be looking at any part of
300  * it other than the in_use field unless they're trying to allocate it.
301  * And since we hold ReplicationSlotAllocationLock, nobody except us can
302  * be doing that. So it's safe to initialize the slot.
303  */
304  Assert(!slot->in_use);
305  Assert(slot->active_pid == 0);
306 
307  /* first initialize persistent data */
308  memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
309  namestrcpy(&slot->data.name, name);
310  slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
311  slot->data.persistency = persistency;
312  slot->data.two_phase = two_phase;
314 
315  /* and then data only present in shared memory */
316  slot->just_dirtied = false;
317  slot->dirty = false;
325 
326  /*
327  * Create the slot on disk. We haven't actually marked the slot allocated
328  * yet, so no special cleanup is required if this errors out.
329  */
330  CreateSlotOnDisk(slot);
331 
332  /*
333  * We need to briefly prevent any other backend from iterating over the
334  * slots while we flip the in_use flag. We also need to set the active
335  * flag while holding the ControlLock as otherwise a concurrent
336  * ReplicationSlotAcquire() could acquire the slot as well.
337  */
338  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
339 
340  slot->in_use = true;
341 
342  /* We can now mark the slot active, and that makes it our slot. */
343  SpinLockAcquire(&slot->mutex);
344  Assert(slot->active_pid == 0);
345  slot->active_pid = MyProcPid;
346  SpinLockRelease(&slot->mutex);
347  MyReplicationSlot = slot;
348 
349  LWLockRelease(ReplicationSlotControlLock);
350 
351  /*
352  * Create statistics entry for the new logical slot. We don't collect any
353  * stats for physical slots, so no need to create an entry for the same.
354  * See ReplicationSlotDropPtr for why we need to do this before releasing
355  * ReplicationSlotAllocationLock.
356  */
357  if (SlotIsLogical(slot))
359 
360  /*
361  * Now that the slot has been marked as in_use and active, it's safe to
362  * let somebody else try to allocate a slot.
363  */
364  LWLockRelease(ReplicationSlotAllocationLock);
365 
366  /* Let everybody know we've modified this slot */
368 }
369 
370 /*
371  * Search for the named replication slot.
372  *
373  * Return the replication slot if found, otherwise NULL.
374  */
376 SearchNamedReplicationSlot(const char *name, bool need_lock)
377 {
378  int i;
379  ReplicationSlot *slot = NULL;
380 
381  if (need_lock)
382  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
383 
384  for (i = 0; i < max_replication_slots; i++)
385  {
387 
388  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
389  {
390  slot = s;
391  break;
392  }
393  }
394 
395  if (need_lock)
396  LWLockRelease(ReplicationSlotControlLock);
397 
398  return slot;
399 }
400 
401 /*
402  * Return the index of the replication slot in
403  * ReplicationSlotCtl->replication_slots.
404  *
405  * This is mainly useful to have an efficient key for storing replication slot
406  * stats.
407  */
408 int
410 {
412  slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
413 
414  return slot - ReplicationSlotCtl->replication_slots;
415 }
416 
417 /*
418  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
419  * the slot's name and true is returned.
420  *
421  * This likely is only useful for pgstat_replslot.c during shutdown, in other
422  * cases there are obvious TOCTOU issues.
423  */
424 bool
426 {
427  ReplicationSlot *slot;
428  bool found;
429 
431 
432  /*
433  * Ensure that the slot cannot be dropped while we copy the name. Don't
434  * need the spinlock as the name of an existing slot cannot change.
435  */
436  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
437  found = slot->in_use;
438  if (slot->in_use)
439  namestrcpy(name, NameStr(slot->data.name));
440  LWLockRelease(ReplicationSlotControlLock);
441 
442  return found;
443 }
444 
445 /*
446  * Find a previously created slot and mark it as used by this process.
447  *
448  * An error is raised if nowait is true and the slot is currently in use. If
449  * nowait is false, we sleep until the slot is released by the owning process.
450  */
451 void
452 ReplicationSlotAcquire(const char *name, bool nowait)
453 {
454  ReplicationSlot *s;
455  int active_pid;
456 
457  Assert(name != NULL);
458 
459 retry:
460  Assert(MyReplicationSlot == NULL);
461 
462  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
463 
464  /*
465  * Search for the slot with the specified name if the slot to acquire is
466  * not given. If the slot is not found, we either return -1 or error out.
467  */
468  s = SearchNamedReplicationSlot(name, false);
469  if (s == NULL || !s->in_use)
470  {
471  LWLockRelease(ReplicationSlotControlLock);
472 
473  ereport(ERROR,
474  (errcode(ERRCODE_UNDEFINED_OBJECT),
475  errmsg("replication slot \"%s\" does not exist",
476  name)));
477  }
478 
479  /*
480  * This is the slot we want; check if it's active under some other
481  * process. In single user mode, we don't need this check.
482  */
483  if (IsUnderPostmaster)
484  {
485  /*
486  * Get ready to sleep on the slot in case it is active. (We may end
487  * up not sleeping, but we don't want to do this while holding the
488  * spinlock.)
489  */
490  if (!nowait)
492 
493  SpinLockAcquire(&s->mutex);
494  if (s->active_pid == 0)
495  s->active_pid = MyProcPid;
496  active_pid = s->active_pid;
497  SpinLockRelease(&s->mutex);
498  }
499  else
500  active_pid = MyProcPid;
501  LWLockRelease(ReplicationSlotControlLock);
502 
503  /*
504  * If we found the slot but it's already active in another process, we
505  * wait until the owning process signals us that it's been released, or
506  * error out.
507  */
508  if (active_pid != MyProcPid)
509  {
510  if (!nowait)
511  {
512  /* Wait here until we get signaled, and then restart */
514  WAIT_EVENT_REPLICATION_SLOT_DROP);
516  goto retry;
517  }
518 
519  ereport(ERROR,
520  (errcode(ERRCODE_OBJECT_IN_USE),
521  errmsg("replication slot \"%s\" is active for PID %d",
522  NameStr(s->data.name), active_pid)));
523  }
524  else if (!nowait)
525  ConditionVariableCancelSleep(); /* no sleep needed after all */
526 
527  /* Let everybody know we've modified this slot */
529 
530  /* We made this slot active, so it's ours now. */
531  MyReplicationSlot = s;
532 
533  /*
534  * The call to pgstat_acquire_replslot() protects against stats for a
535  * different slot, from before a restart or such, being present during
536  * pgstat_report_replslot().
537  */
538  if (SlotIsLogical(s))
540 }
541 
542 /*
543  * Release the replication slot that this backend considers to own.
544  *
545  * This or another backend can re-acquire the slot later.
546  * Resources this slot requires will be preserved.
547  */
548 void
550 {
552 
553  Assert(slot != NULL && slot->active_pid != 0);
554 
555  if (slot->data.persistency == RS_EPHEMERAL)
556  {
557  /*
558  * Delete the slot. There is no !PANIC case where this is allowed to
559  * fail, all that may happen is an incomplete cleanup of the on-disk
560  * data.
561  */
563  }
564 
565  /*
566  * If slot needed to temporarily restrain both data and catalog xmin to
567  * create the catalog snapshot, remove that temporary constraint.
568  * Snapshots can only be exported while the initial snapshot is still
569  * acquired.
570  */
571  if (!TransactionIdIsValid(slot->data.xmin) &&
573  {
574  SpinLockAcquire(&slot->mutex);
576  SpinLockRelease(&slot->mutex);
578  }
579 
580  if (slot->data.persistency == RS_PERSISTENT)
581  {
582  /*
583  * Mark persistent slot inactive. We're not freeing it, just
584  * disconnecting, but wake up others that may be waiting for it.
585  */
586  SpinLockAcquire(&slot->mutex);
587  slot->active_pid = 0;
588  SpinLockRelease(&slot->mutex);
590  }
591 
592  MyReplicationSlot = NULL;
593 
594  /* might not have been set when we've been a plain slot */
595  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
598  LWLockRelease(ProcArrayLock);
599 }
600 
601 /*
602  * Cleanup all temporary slots created in current session.
603  */
604 void
606 {
607  int i;
608 
609  Assert(MyReplicationSlot == NULL);
610 
611 restart:
612  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
613  for (i = 0; i < max_replication_slots; i++)
614  {
616 
617  if (!s->in_use)
618  continue;
619 
620  SpinLockAcquire(&s->mutex);
621  if (s->active_pid == MyProcPid)
622  {
624  SpinLockRelease(&s->mutex);
625  LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
626 
628 
630  goto restart;
631  }
632  else
633  SpinLockRelease(&s->mutex);
634  }
635 
636  LWLockRelease(ReplicationSlotControlLock);
637 }
638 
639 /*
640  * Permanently drop replication slot identified by the passed in name.
641  */
642 void
643 ReplicationSlotDrop(const char *name, bool nowait)
644 {
645  Assert(MyReplicationSlot == NULL);
646 
647  ReplicationSlotAcquire(name, nowait);
648 
650 }
651 
652 /*
653  * Permanently drop the currently acquired replication slot.
654  */
655 static void
657 {
659 
660  Assert(MyReplicationSlot != NULL);
661 
662  /* slot isn't acquired anymore */
663  MyReplicationSlot = NULL;
664 
666 }
667 
668 /*
669  * Permanently drop the replication slot which will be released by the point
670  * this function returns.
671  */
672 static void
674 {
675  char path[MAXPGPATH];
676  char tmppath[MAXPGPATH];
677 
678  /*
679  * If some other backend ran this code concurrently with us, we might try
680  * to delete a slot with a certain name while someone else was trying to
681  * create a slot with the same name.
682  */
683  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
684 
685  /* Generate pathnames. */
686  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
687  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
688 
689  /*
690  * Rename the slot directory on disk, so that we'll no longer recognize
691  * this as a valid slot. Note that if this fails, we've got to mark the
692  * slot inactive before bailing out. If we're dropping an ephemeral or a
693  * temporary slot, we better never fail hard as the caller won't expect
694  * the slot to survive and this might get called during error handling.
695  */
696  if (rename(path, tmppath) == 0)
697  {
698  /*
699  * We need to fsync() the directory we just renamed and its parent to
700  * make sure that our changes are on disk in a crash-safe fashion. If
701  * fsync() fails, we can't be sure whether the changes are on disk or
702  * not. For now, we handle that by panicking;
703  * StartupReplicationSlots() will try to straighten it out after
704  * restart.
705  */
707  fsync_fname(tmppath, true);
708  fsync_fname("pg_replslot", true);
710  }
711  else
712  {
713  bool fail_softly = slot->data.persistency != RS_PERSISTENT;
714 
715  SpinLockAcquire(&slot->mutex);
716  slot->active_pid = 0;
717  SpinLockRelease(&slot->mutex);
718 
719  /* wake up anyone waiting on this slot */
721 
722  ereport(fail_softly ? WARNING : ERROR,
724  errmsg("could not rename file \"%s\" to \"%s\": %m",
725  path, tmppath)));
726  }
727 
728  /*
729  * The slot is definitely gone. Lock out concurrent scans of the array
730  * long enough to kill it. It's OK to clear the active PID here without
731  * grabbing the mutex because nobody else can be scanning the array here,
732  * and nobody can be attached to this slot and thus access it without
733  * scanning the array.
734  *
735  * Also wake up processes waiting for it.
736  */
737  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
738  slot->active_pid = 0;
739  slot->in_use = false;
740  LWLockRelease(ReplicationSlotControlLock);
742 
743  /*
744  * Slot is dead and doesn't prevent resource removal anymore, recompute
745  * limits.
746  */
749 
750  /*
751  * If removing the directory fails, the worst thing that will happen is
752  * that the user won't be able to create a new slot with the same name
753  * until the next server restart. We warn about it, but that's all.
754  */
755  if (!rmtree(tmppath, true))
757  (errmsg("could not remove directory \"%s\"", tmppath)));
758 
759  /*
760  * Drop the statistics entry for the replication slot. Do this while
761  * holding ReplicationSlotAllocationLock so that we don't drop a
762  * statistics entry for another slot with the same name just created in
763  * another session.
764  */
765  if (SlotIsLogical(slot))
766  pgstat_drop_replslot(slot);
767 
768  /*
769  * We release this at the very end, so that nobody starts trying to create
770  * a slot while we're still cleaning up the detritus of the old one.
771  */
772  LWLockRelease(ReplicationSlotAllocationLock);
773 }
774 
775 /*
776  * Serialize the currently acquired slot's state from memory to disk, thereby
777  * guaranteeing the current state will survive a crash.
778  */
779 void
781 {
782  char path[MAXPGPATH];
783 
784  Assert(MyReplicationSlot != NULL);
785 
786  sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
788 }
789 
790 /*
791  * Signal that it would be useful if the currently acquired slot would be
792  * flushed out to disk.
793  *
794  * Note that the actual flush to disk can be delayed for a long time, if
795  * required for correctness explicitly do a ReplicationSlotSave().
796  */
797 void
799 {
801 
802  Assert(MyReplicationSlot != NULL);
803 
804  SpinLockAcquire(&slot->mutex);
806  MyReplicationSlot->dirty = true;
807  SpinLockRelease(&slot->mutex);
808 }
809 
810 /*
811  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
812  * guaranteeing it will be there after an eventual crash.
813  */
814 void
816 {
818 
819  Assert(slot != NULL);
821 
822  SpinLockAcquire(&slot->mutex);
824  SpinLockRelease(&slot->mutex);
825 
828 }
829 
830 /*
831  * Compute the oldest xmin across all slots and store it in the ProcArray.
832  *
833  * If already_locked is true, ProcArrayLock has already been acquired
834  * exclusively.
835  */
836 void
838 {
839  int i;
841  TransactionId agg_catalog_xmin = InvalidTransactionId;
842 
843  Assert(ReplicationSlotCtl != NULL);
844 
845  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
846 
847  for (i = 0; i < max_replication_slots; i++)
848  {
850  TransactionId effective_xmin;
851  TransactionId effective_catalog_xmin;
852  bool invalidated;
853 
854  if (!s->in_use)
855  continue;
856 
857  SpinLockAcquire(&s->mutex);
858  effective_xmin = s->effective_xmin;
859  effective_catalog_xmin = s->effective_catalog_xmin;
860  invalidated = s->data.invalidated != RS_INVAL_NONE;
861  SpinLockRelease(&s->mutex);
862 
863  /* invalidated slots need not apply */
864  if (invalidated)
865  continue;
866 
867  /* check the data xmin */
868  if (TransactionIdIsValid(effective_xmin) &&
869  (!TransactionIdIsValid(agg_xmin) ||
870  TransactionIdPrecedes(effective_xmin, agg_xmin)))
871  agg_xmin = effective_xmin;
872 
873  /* check the catalog xmin */
874  if (TransactionIdIsValid(effective_catalog_xmin) &&
875  (!TransactionIdIsValid(agg_catalog_xmin) ||
876  TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
877  agg_catalog_xmin = effective_catalog_xmin;
878  }
879 
880  LWLockRelease(ReplicationSlotControlLock);
881 
882  ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
883 }
884 
885 /*
886  * Compute the oldest restart LSN across all slots and inform xlog module.
887  *
888  * Note: while max_slot_wal_keep_size is theoretically relevant for this
889  * purpose, we don't try to account for that, because this module doesn't
890  * know what to compare against.
891  */
892 void
894 {
895  int i;
896  XLogRecPtr min_required = InvalidXLogRecPtr;
897 
898  Assert(ReplicationSlotCtl != NULL);
899 
900  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
901  for (i = 0; i < max_replication_slots; i++)
902  {
904  XLogRecPtr restart_lsn;
905  bool invalidated;
906 
907  if (!s->in_use)
908  continue;
909 
910  SpinLockAcquire(&s->mutex);
911  restart_lsn = s->data.restart_lsn;
912  invalidated = s->data.invalidated != RS_INVAL_NONE;
913  SpinLockRelease(&s->mutex);
914 
915  /* invalidated slots need not apply */
916  if (invalidated)
917  continue;
918 
919  if (restart_lsn != InvalidXLogRecPtr &&
920  (min_required == InvalidXLogRecPtr ||
921  restart_lsn < min_required))
922  min_required = restart_lsn;
923  }
924  LWLockRelease(ReplicationSlotControlLock);
925 
926  XLogSetReplicationSlotMinimumLSN(min_required);
927 }
928 
929 /*
930  * Compute the oldest WAL LSN required by *logical* decoding slots..
931  *
932  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
933  * slots exist.
934  *
935  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
936  * ignores physical replication slots.
937  *
938  * The results aren't required frequently, so we don't maintain a precomputed
939  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
940  */
943 {
944  XLogRecPtr result = InvalidXLogRecPtr;
945  int i;
946 
947  if (max_replication_slots <= 0)
948  return InvalidXLogRecPtr;
949 
950  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
951 
952  for (i = 0; i < max_replication_slots; i++)
953  {
954  ReplicationSlot *s;
955  XLogRecPtr restart_lsn;
956  bool invalidated;
957 
959 
960  /* cannot change while ReplicationSlotCtlLock is held */
961  if (!s->in_use)
962  continue;
963 
964  /* we're only interested in logical slots */
965  if (!SlotIsLogical(s))
966  continue;
967 
968  /* read once, it's ok if it increases while we're checking */
969  SpinLockAcquire(&s->mutex);
970  restart_lsn = s->data.restart_lsn;
971  invalidated = s->data.invalidated != RS_INVAL_NONE;
972  SpinLockRelease(&s->mutex);
973 
974  /* invalidated slots need not apply */
975  if (invalidated)
976  continue;
977 
978  if (restart_lsn == InvalidXLogRecPtr)
979  continue;
980 
981  if (result == InvalidXLogRecPtr ||
982  restart_lsn < result)
983  result = restart_lsn;
984  }
985 
986  LWLockRelease(ReplicationSlotControlLock);
987 
988  return result;
989 }
990 
991 /*
992  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
993  * passed database oid.
994  *
995  * Returns true if there are any slots referencing the database. *nslots will
996  * be set to the absolute number of slots in the database, *nactive to ones
997  * currently active.
998  */
999 bool
1000 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1001 {
1002  int i;
1003 
1004  *nslots = *nactive = 0;
1005 
1006  if (max_replication_slots <= 0)
1007  return false;
1008 
1009  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1010  for (i = 0; i < max_replication_slots; i++)
1011  {
1012  ReplicationSlot *s;
1013 
1015 
1016  /* cannot change while ReplicationSlotCtlLock is held */
1017  if (!s->in_use)
1018  continue;
1019 
1020  /* only logical slots are database specific, skip */
1021  if (!SlotIsLogical(s))
1022  continue;
1023 
1024  /* not our database, skip */
1025  if (s->data.database != dboid)
1026  continue;
1027 
1028  /* NB: intentionally counting invalidated slots */
1029 
1030  /* count slots with spinlock held */
1031  SpinLockAcquire(&s->mutex);
1032  (*nslots)++;
1033  if (s->active_pid != 0)
1034  (*nactive)++;
1035  SpinLockRelease(&s->mutex);
1036  }
1037  LWLockRelease(ReplicationSlotControlLock);
1038 
1039  if (*nslots > 0)
1040  return true;
1041  return false;
1042 }
1043 
1044 /*
1045  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1046  * passed database oid. The caller should hold an exclusive lock on the
1047  * pg_database oid for the database to prevent creation of new slots on the db
1048  * or replay from existing slots.
1049  *
1050  * Another session that concurrently acquires an existing slot on the target DB
1051  * (most likely to drop it) may cause this function to ERROR. If that happens
1052  * it may have dropped some but not all slots.
1053  *
1054  * This routine isn't as efficient as it could be - but we don't drop
1055  * databases often, especially databases with lots of slots.
1056  */
1057 void
1059 {
1060  int i;
1061 
1062  if (max_replication_slots <= 0)
1063  return;
1064 
1065 restart:
1066  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1067  for (i = 0; i < max_replication_slots; i++)
1068  {
1069  ReplicationSlot *s;
1070  char *slotname;
1071  int active_pid;
1072 
1074 
1075  /* cannot change while ReplicationSlotCtlLock is held */
1076  if (!s->in_use)
1077  continue;
1078 
1079  /* only logical slots are database specific, skip */
1080  if (!SlotIsLogical(s))
1081  continue;
1082 
1083  /* not our database, skip */
1084  if (s->data.database != dboid)
1085  continue;
1086 
1087  /* NB: intentionally including invalidated slots */
1088 
1089  /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1090  SpinLockAcquire(&s->mutex);
1091  /* can't change while ReplicationSlotControlLock is held */
1092  slotname = NameStr(s->data.name);
1093  active_pid = s->active_pid;
1094  if (active_pid == 0)
1095  {
1096  MyReplicationSlot = s;
1097  s->active_pid = MyProcPid;
1098  }
1099  SpinLockRelease(&s->mutex);
1100 
1101  /*
1102  * Even though we hold an exclusive lock on the database object a
1103  * logical slot for that DB can still be active, e.g. if it's
1104  * concurrently being dropped by a backend connected to another DB.
1105  *
1106  * That's fairly unlikely in practice, so we'll just bail out.
1107  */
1108  if (active_pid)
1109  ereport(ERROR,
1110  (errcode(ERRCODE_OBJECT_IN_USE),
1111  errmsg("replication slot \"%s\" is active for PID %d",
1112  slotname, active_pid)));
1113 
1114  /*
1115  * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1116  * holding ReplicationSlotControlLock over filesystem operations,
1117  * release ReplicationSlotControlLock and use
1118  * ReplicationSlotDropAcquired.
1119  *
1120  * As that means the set of slots could change, restart scan from the
1121  * beginning each time we release the lock.
1122  */
1123  LWLockRelease(ReplicationSlotControlLock);
1125  goto restart;
1126  }
1127  LWLockRelease(ReplicationSlotControlLock);
1128 }
1129 
1130 
1131 /*
1132  * Check whether the server's configuration supports using replication
1133  * slots.
1134  */
1135 void
1137 {
1138  /*
1139  * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1140  * needs the same check.
1141  */
1142 
1143  if (max_replication_slots == 0)
1144  ereport(ERROR,
1145  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1146  errmsg("replication slots can only be used if max_replication_slots > 0")));
1147 
1149  ereport(ERROR,
1150  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1151  errmsg("replication slots can only be used if wal_level >= replica")));
1152 }
1153 
1154 /*
1155  * Check whether the user has privilege to use replication slots.
1156  */
1157 void
1159 {
1160  if (!has_rolreplication(GetUserId()))
1161  ereport(ERROR,
1162  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1163  errmsg("permission denied to use replication slots"),
1164  errdetail("Only roles with the %s attribute may use replication slots.",
1165  "REPLICATION")));
1166 }
1167 
1168 /*
1169  * Reserve WAL for the currently active slot.
1170  *
1171  * Compute and set restart_lsn in a manner that's appropriate for the type of
1172  * the slot and concurrency safe.
1173  */
1174 void
1176 {
1178 
1179  Assert(slot != NULL);
1181 
1182  /*
1183  * The replication slot mechanism is used to prevent removal of required
1184  * WAL. As there is no interlock between this routine and checkpoints, WAL
1185  * segments could concurrently be removed when a now stale return value of
1186  * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1187  * this happens we'll just retry.
1188  */
1189  while (true)
1190  {
1191  XLogSegNo segno;
1192  XLogRecPtr restart_lsn;
1193 
1194  /*
1195  * For logical slots log a standby snapshot and start logical decoding
1196  * at exactly that position. That allows the slot to start up more
1197  * quickly. But on a standby we cannot do WAL writes, so just use the
1198  * replay pointer; effectively, an attempt to create a logical slot on
1199  * standby will cause it to wait for an xl_running_xact record to be
1200  * logged independently on the primary, so that a snapshot can be
1201  * built using the record.
1202  *
1203  * None of this is needed (or indeed helpful) for physical slots as
1204  * they'll start replay at the last logged checkpoint anyway. Instead
1205  * return the location of the last redo LSN. While that slightly
1206  * increases the chance that we have to retry, it's where a base
1207  * backup has to start replay at.
1208  */
1209  if (SlotIsPhysical(slot))
1210  restart_lsn = GetRedoRecPtr();
1211  else if (RecoveryInProgress())
1212  restart_lsn = GetXLogReplayRecPtr(NULL);
1213  else
1214  restart_lsn = GetXLogInsertRecPtr();
1215 
1216  SpinLockAcquire(&slot->mutex);
1217  slot->data.restart_lsn = restart_lsn;
1218  SpinLockRelease(&slot->mutex);
1219 
1220  /* prevent WAL removal as fast as possible */
1222 
1223  /*
1224  * If all required WAL is still there, great, otherwise retry. The
1225  * slot should prevent further removal of WAL, unless there's a
1226  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1227  * the new restart_lsn above, so normally we should never need to loop
1228  * more than twice.
1229  */
1231  if (XLogGetLastRemovedSegno() < segno)
1232  break;
1233  }
1234 
1235  if (!RecoveryInProgress() && SlotIsLogical(slot))
1236  {
1237  XLogRecPtr flushptr;
1238 
1239  /* make sure we have enough information to start */
1240  flushptr = LogStandbySnapshot();
1241 
1242  /* and make sure it's fsynced to disk */
1243  XLogFlush(flushptr);
1244  }
1245 }
1246 
1247 /*
1248  * Report that replication slot needs to be invalidated
1249  */
1250 static void
1252  bool terminating,
1253  int pid,
1254  NameData slotname,
1255  XLogRecPtr restart_lsn,
1256  XLogRecPtr oldestLSN,
1257  TransactionId snapshotConflictHorizon)
1258 {
1259  StringInfoData err_detail;
1260  bool hint = false;
1261 
1262  initStringInfo(&err_detail);
1263 
1264  switch (cause)
1265  {
1266  case RS_INVAL_WAL_REMOVED:
1267  {
1268  unsigned long long ex = oldestLSN - restart_lsn;
1269 
1270  hint = true;
1271  appendStringInfo(&err_detail,
1272  ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1273  "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1274  ex),
1275  LSN_FORMAT_ARGS(restart_lsn),
1276  ex);
1277  break;
1278  }
1279  case RS_INVAL_HORIZON:
1280  appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1281  snapshotConflictHorizon);
1282  break;
1283 
1284  case RS_INVAL_WAL_LEVEL:
1285  appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
1286  break;
1287  case RS_INVAL_NONE:
1288  pg_unreachable();
1289  }
1290 
1291  ereport(LOG,
1292  terminating ?
1293  errmsg("terminating process %d to release replication slot \"%s\"",
1294  pid, NameStr(slotname)) :
1295  errmsg("invalidating obsolete replication slot \"%s\"",
1296  NameStr(slotname)),
1297  errdetail_internal("%s", err_detail.data),
1298  hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
1299 
1300  pfree(err_detail.data);
1301 }
1302 
1303 /*
1304  * Helper for InvalidateObsoleteReplicationSlots
1305  *
1306  * Acquires the given slot and mark it invalid, if necessary and possible.
1307  *
1308  * Returns whether ReplicationSlotControlLock was released in the interim (and
1309  * in that case we're not holding the lock at return, otherwise we are).
1310  *
1311  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1312  *
1313  * This is inherently racy, because we release the LWLock
1314  * for syscalls, so caller must restart if we return true.
1315  */
1316 static bool
1318  ReplicationSlot *s,
1319  XLogRecPtr oldestLSN,
1320  Oid dboid, TransactionId snapshotConflictHorizon,
1321  bool *invalidated)
1322 {
1323  int last_signaled_pid = 0;
1324  bool released_lock = false;
1325 
1326  for (;;)
1327  {
1328  XLogRecPtr restart_lsn;
1329  NameData slotname;
1330  int active_pid = 0;
1332 
1333  Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1334 
1335  if (!s->in_use)
1336  {
1337  if (released_lock)
1338  LWLockRelease(ReplicationSlotControlLock);
1339  break;
1340  }
1341 
1342  /*
1343  * Check if the slot needs to be invalidated. If it needs to be
1344  * invalidated, and is not currently acquired, acquire it and mark it
1345  * as having been invalidated. We do this with the spinlock held to
1346  * avoid race conditions -- for example the restart_lsn could move
1347  * forward, or the slot could be dropped.
1348  */
1349  SpinLockAcquire(&s->mutex);
1350 
1351  restart_lsn = s->data.restart_lsn;
1352 
1353  /*
1354  * If the slot is already invalid or is a non conflicting slot, we
1355  * don't need to do anything.
1356  */
1357  if (s->data.invalidated == RS_INVAL_NONE)
1358  {
1359  switch (cause)
1360  {
1361  case RS_INVAL_WAL_REMOVED:
1362  if (s->data.restart_lsn != InvalidXLogRecPtr &&
1363  s->data.restart_lsn < oldestLSN)
1364  conflict = cause;
1365  break;
1366  case RS_INVAL_HORIZON:
1367  if (!SlotIsLogical(s))
1368  break;
1369  /* invalid DB oid signals a shared relation */
1370  if (dboid != InvalidOid && dboid != s->data.database)
1371  break;
1374  snapshotConflictHorizon))
1375  conflict = cause;
1378  snapshotConflictHorizon))
1379  conflict = cause;
1380  break;
1381  case RS_INVAL_WAL_LEVEL:
1382  if (SlotIsLogical(s))
1383  conflict = cause;
1384  break;
1385  case RS_INVAL_NONE:
1386  pg_unreachable();
1387  }
1388  }
1389 
1390  /* if there's no conflict, we're done */
1391  if (conflict == RS_INVAL_NONE)
1392  {
1393  SpinLockRelease(&s->mutex);
1394  if (released_lock)
1395  LWLockRelease(ReplicationSlotControlLock);
1396  break;
1397  }
1398 
1399  slotname = s->data.name;
1400  active_pid = s->active_pid;
1401 
1402  /*
1403  * If the slot can be acquired, do so and mark it invalidated
1404  * immediately. Otherwise we'll signal the owning process, below, and
1405  * retry.
1406  */
1407  if (active_pid == 0)
1408  {
1409  MyReplicationSlot = s;
1410  s->active_pid = MyProcPid;
1411  s->data.invalidated = conflict;
1412 
1413  /*
1414  * XXX: We should consider not overwriting restart_lsn and instead
1415  * just rely on .invalidated.
1416  */
1417  if (conflict == RS_INVAL_WAL_REMOVED)
1419 
1420  /* Let caller know */
1421  *invalidated = true;
1422  }
1423 
1424  SpinLockRelease(&s->mutex);
1425 
1426  if (active_pid != 0)
1427  {
1428  /*
1429  * Prepare the sleep on the slot's condition variable before
1430  * releasing the lock, to close a possible race condition if the
1431  * slot is released before the sleep below.
1432  */
1434 
1435  LWLockRelease(ReplicationSlotControlLock);
1436  released_lock = true;
1437 
1438  /*
1439  * Signal to terminate the process that owns the slot, if we
1440  * haven't already signalled it. (Avoidance of repeated
1441  * signalling is the only reason for there to be a loop in this
1442  * routine; otherwise we could rely on caller's restart loop.)
1443  *
1444  * There is the race condition that other process may own the slot
1445  * after its current owner process is terminated and before this
1446  * process owns it. To handle that, we signal only if the PID of
1447  * the owning process has changed from the previous time. (This
1448  * logic assumes that the same PID is not reused very quickly.)
1449  */
1450  if (last_signaled_pid != active_pid)
1451  {
1452  ReportSlotInvalidation(conflict, true, active_pid,
1453  slotname, restart_lsn,
1454  oldestLSN, snapshotConflictHorizon);
1455 
1456  if (MyBackendType == B_STARTUP)
1457  (void) SendProcSignal(active_pid,
1460  else
1461  (void) kill(active_pid, SIGTERM);
1462 
1463  last_signaled_pid = active_pid;
1464  }
1465 
1466  /* Wait until the slot is released. */
1468  WAIT_EVENT_REPLICATION_SLOT_DROP);
1469 
1470  /*
1471  * Re-acquire lock and start over; we expect to invalidate the
1472  * slot next time (unless another process acquires the slot in the
1473  * meantime).
1474  */
1475  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1476  continue;
1477  }
1478  else
1479  {
1480  /*
1481  * We hold the slot now and have already invalidated it; flush it
1482  * to ensure that state persists.
1483  *
1484  * Don't want to hold ReplicationSlotControlLock across file
1485  * system operations, so release it now but be sure to tell caller
1486  * to restart from scratch.
1487  */
1488  LWLockRelease(ReplicationSlotControlLock);
1489  released_lock = true;
1490 
1491  /* Make sure the invalidated state persists across server restart */
1496 
1497  ReportSlotInvalidation(conflict, false, active_pid,
1498  slotname, restart_lsn,
1499  oldestLSN, snapshotConflictHorizon);
1500 
1501  /* done with this slot for now */
1502  break;
1503  }
1504  }
1505 
1506  Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1507 
1508  return released_lock;
1509 }
1510 
1511 /*
1512  * Invalidate slots that require resources about to be removed.
1513  *
1514  * Returns true when any slot have got invalidated.
1515  *
1516  * Whether a slot needs to be invalidated depends on the cause. A slot is
1517  * removed if it:
1518  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1519  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1520  * db; dboid may be InvalidOid for shared relations
1521  * - RS_INVAL_WAL_LEVEL: is logical
1522  *
1523  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1524  */
1525 bool
1527  XLogSegNo oldestSegno, Oid dboid,
1528  TransactionId snapshotConflictHorizon)
1529 {
1530  XLogRecPtr oldestLSN;
1531  bool invalidated = false;
1532 
1533  Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1534  Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1535  Assert(cause != RS_INVAL_NONE);
1536 
1537  if (max_replication_slots == 0)
1538  return invalidated;
1539 
1540  XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1541 
1542 restart:
1543  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1544  for (int i = 0; i < max_replication_slots; i++)
1545  {
1547 
1548  if (!s->in_use)
1549  continue;
1550 
1551  if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1552  snapshotConflictHorizon,
1553  &invalidated))
1554  {
1555  /* if the lock was released, start from scratch */
1556  goto restart;
1557  }
1558  }
1559  LWLockRelease(ReplicationSlotControlLock);
1560 
1561  /*
1562  * If any slots have been invalidated, recalculate the resource limits.
1563  */
1564  if (invalidated)
1565  {
1568  }
1569 
1570  return invalidated;
1571 }
1572 
1573 /*
1574  * Flush all replication slots to disk.
1575  *
1576  * It is convenient to flush dirty replication slots at the time of checkpoint.
1577  * Additionally, in case of a shutdown checkpoint, we also identify the slots
1578  * for which the confirmed_flush LSN has been updated since the last time it
1579  * was saved and flush them.
1580  */
1581 void
1583 {
1584  int i;
1585 
1586  elog(DEBUG1, "performing replication slot checkpoint");
1587 
1588  /*
1589  * Prevent any slot from being created/dropped while we're active. As we
1590  * explicitly do *not* want to block iterating over replication_slots or
1591  * acquiring a slot we cannot take the control lock - but that's OK,
1592  * because holding ReplicationSlotAllocationLock is strictly stronger, and
1593  * enough to guarantee that nobody can change the in_use bits on us.
1594  */
1595  LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1596 
1597  for (i = 0; i < max_replication_slots; i++)
1598  {
1600  char path[MAXPGPATH];
1601 
1602  if (!s->in_use)
1603  continue;
1604 
1605  /* save the slot to disk, locking is handled in SaveSlotToPath() */
1606  sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1607 
1608  /*
1609  * Slot's data is not flushed each time the confirmed_flush LSN is
1610  * updated as that could lead to frequent writes. However, we decide
1611  * to force a flush of all logical slot's data at the time of shutdown
1612  * if the confirmed_flush LSN is changed since we last flushed it to
1613  * disk. This helps in avoiding an unnecessary retreat of the
1614  * confirmed_flush LSN after restart.
1615  */
1616  if (is_shutdown && SlotIsLogical(s))
1617  {
1618  SpinLockAcquire(&s->mutex);
1619 
1621 
1622  if (s->data.invalidated == RS_INVAL_NONE &&
1624  {
1625  s->just_dirtied = true;
1626  s->dirty = true;
1627  }
1628  SpinLockRelease(&s->mutex);
1629  }
1630 
1631  SaveSlotToPath(s, path, LOG);
1632  }
1633  LWLockRelease(ReplicationSlotAllocationLock);
1634 }
1635 
1636 /*
1637  * Load all replication slots from disk into memory at server startup. This
1638  * needs to be run before we start crash recovery.
1639  */
1640 void
1642 {
1643  DIR *replication_dir;
1644  struct dirent *replication_de;
1645 
1646  elog(DEBUG1, "starting up replication slots");
1647 
1648  /* restore all slots by iterating over all on-disk entries */
1649  replication_dir = AllocateDir("pg_replslot");
1650  while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1651  {
1652  char path[MAXPGPATH + 12];
1653  PGFileType de_type;
1654 
1655  if (strcmp(replication_de->d_name, ".") == 0 ||
1656  strcmp(replication_de->d_name, "..") == 0)
1657  continue;
1658 
1659  snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1660  de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1661 
1662  /* we're only creating directories here, skip if it's not our's */
1663  if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1664  continue;
1665 
1666  /* we crashed while a slot was being setup or deleted, clean up */
1667  if (pg_str_endswith(replication_de->d_name, ".tmp"))
1668  {
1669  if (!rmtree(path, true))
1670  {
1671  ereport(WARNING,
1672  (errmsg("could not remove directory \"%s\"",
1673  path)));
1674  continue;
1675  }
1676  fsync_fname("pg_replslot", true);
1677  continue;
1678  }
1679 
1680  /* looks like a slot in a normal state, restore */
1681  RestoreSlotFromDisk(replication_de->d_name);
1682  }
1683  FreeDir(replication_dir);
1684 
1685  /* currently no slots exist, we're done. */
1686  if (max_replication_slots <= 0)
1687  return;
1688 
1689  /* Now that we have recovered all the data, compute replication xmin */
1692 }
1693 
1694 /* ----
1695  * Manipulation of on-disk state of replication slots
1696  *
1697  * NB: none of the routines below should take any notice whether a slot is the
1698  * current one or not, that's all handled a layer above.
1699  * ----
1700  */
1701 static void
1703 {
1704  char tmppath[MAXPGPATH];
1705  char path[MAXPGPATH];
1706  struct stat st;
1707 
1708  /*
1709  * No need to take out the io_in_progress_lock, nobody else can see this
1710  * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1711  * takes out the lock, if we'd take the lock here, we'd deadlock.
1712  */
1713 
1714  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1715  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1716 
1717  /*
1718  * It's just barely possible that some previous effort to create or drop a
1719  * slot with this name left a temp directory lying around. If that seems
1720  * to be the case, try to remove it. If the rmtree() fails, we'll error
1721  * out at the MakePGDirectory() below, so we don't bother checking
1722  * success.
1723  */
1724  if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1725  rmtree(tmppath, true);
1726 
1727  /* Create and fsync the temporary slot directory. */
1728  if (MakePGDirectory(tmppath) < 0)
1729  ereport(ERROR,
1731  errmsg("could not create directory \"%s\": %m",
1732  tmppath)));
1733  fsync_fname(tmppath, true);
1734 
1735  /* Write the actual state file. */
1736  slot->dirty = true; /* signal that we really need to write */
1737  SaveSlotToPath(slot, tmppath, ERROR);
1738 
1739  /* Rename the directory into place. */
1740  if (rename(tmppath, path) != 0)
1741  ereport(ERROR,
1743  errmsg("could not rename file \"%s\" to \"%s\": %m",
1744  tmppath, path)));
1745 
1746  /*
1747  * If we'd now fail - really unlikely - we wouldn't know whether this slot
1748  * would persist after an OS crash or not - so, force a restart. The
1749  * restart would try to fsync this again till it works.
1750  */
1752 
1753  fsync_fname(path, true);
1754  fsync_fname("pg_replslot", true);
1755 
1756  END_CRIT_SECTION();
1757 }
1758 
1759 /*
1760  * Shared functionality between saving and creating a replication slot.
1761  */
1762 static void
1763 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1764 {
1765  char tmppath[MAXPGPATH];
1766  char path[MAXPGPATH];
1767  int fd;
1769  bool was_dirty;
1770 
1771  /* first check whether there's something to write out */
1772  SpinLockAcquire(&slot->mutex);
1773  was_dirty = slot->dirty;
1774  slot->just_dirtied = false;
1775  SpinLockRelease(&slot->mutex);
1776 
1777  /* and don't do anything if there's nothing to write */
1778  if (!was_dirty)
1779  return;
1780 
1782 
1783  /* silence valgrind :( */
1784  memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1785 
1786  sprintf(tmppath, "%s/state.tmp", dir);
1787  sprintf(path, "%s/state", dir);
1788 
1789  fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1790  if (fd < 0)
1791  {
1792  /*
1793  * If not an ERROR, then release the lock before returning. In case
1794  * of an ERROR, the error recovery path automatically releases the
1795  * lock, but no harm in explicitly releasing even in that case. Note
1796  * that LWLockRelease() could affect errno.
1797  */
1798  int save_errno = errno;
1799 
1801  errno = save_errno;
1802  ereport(elevel,
1804  errmsg("could not create file \"%s\": %m",
1805  tmppath)));
1806  return;
1807  }
1808 
1809  cp.magic = SLOT_MAGIC;
1810  INIT_CRC32C(cp.checksum);
1811  cp.version = SLOT_VERSION;
1813 
1814  SpinLockAcquire(&slot->mutex);
1815 
1816  memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1817 
1818  SpinLockRelease(&slot->mutex);
1819 
1820  COMP_CRC32C(cp.checksum,
1821  (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
1823  FIN_CRC32C(cp.checksum);
1824 
1825  errno = 0;
1826  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1827  if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1828  {
1829  int save_errno = errno;
1830 
1834 
1835  /* if write didn't set errno, assume problem is no disk space */
1836  errno = save_errno ? save_errno : ENOSPC;
1837  ereport(elevel,
1839  errmsg("could not write to file \"%s\": %m",
1840  tmppath)));
1841  return;
1842  }
1844 
1845  /* fsync the temporary file */
1846  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1847  if (pg_fsync(fd) != 0)
1848  {
1849  int save_errno = errno;
1850 
1854  errno = save_errno;
1855  ereport(elevel,
1857  errmsg("could not fsync file \"%s\": %m",
1858  tmppath)));
1859  return;
1860  }
1862 
1863  if (CloseTransientFile(fd) != 0)
1864  {
1865  int save_errno = errno;
1866 
1868  errno = save_errno;
1869  ereport(elevel,
1871  errmsg("could not close file \"%s\": %m",
1872  tmppath)));
1873  return;
1874  }
1875 
1876  /* rename to permanent file, fsync file and directory */
1877  if (rename(tmppath, path) != 0)
1878  {
1879  int save_errno = errno;
1880 
1882  errno = save_errno;
1883  ereport(elevel,
1885  errmsg("could not rename file \"%s\" to \"%s\": %m",
1886  tmppath, path)));
1887  return;
1888  }
1889 
1890  /*
1891  * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1892  */
1894 
1895  fsync_fname(path, false);
1896  fsync_fname(dir, true);
1897  fsync_fname("pg_replslot", true);
1898 
1899  END_CRIT_SECTION();
1900 
1901  /*
1902  * Successfully wrote, unset dirty bit, unless somebody dirtied again
1903  * already and remember the confirmed_flush LSN value.
1904  */
1905  SpinLockAcquire(&slot->mutex);
1906  if (!slot->just_dirtied)
1907  slot->dirty = false;
1909  SpinLockRelease(&slot->mutex);
1910 
1912 }
1913 
1914 /*
1915  * Load a single slot from disk into memory.
1916  */
1917 static void
1919 {
1921  int i;
1922  char slotdir[MAXPGPATH + 12];
1923  char path[MAXPGPATH + 22];
1924  int fd;
1925  bool restored = false;
1926  int readBytes;
1927  pg_crc32c checksum;
1928 
1929  /* no need to lock here, no concurrent access allowed yet */
1930 
1931  /* delete temp file if it exists */
1932  sprintf(slotdir, "pg_replslot/%s", name);
1933  sprintf(path, "%s/state.tmp", slotdir);
1934  if (unlink(path) < 0 && errno != ENOENT)
1935  ereport(PANIC,
1937  errmsg("could not remove file \"%s\": %m", path)));
1938 
1939  sprintf(path, "%s/state", slotdir);
1940 
1941  elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1942 
1943  /* on some operating systems fsyncing a file requires O_RDWR */
1944  fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1945 
1946  /*
1947  * We do not need to handle this as we are rename()ing the directory into
1948  * place only after we fsync()ed the state file.
1949  */
1950  if (fd < 0)
1951  ereport(PANIC,
1953  errmsg("could not open file \"%s\": %m", path)));
1954 
1955  /*
1956  * Sync state file before we're reading from it. We might have crashed
1957  * while it wasn't synced yet and we shouldn't continue on that basis.
1958  */
1959  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1960  if (pg_fsync(fd) != 0)
1961  ereport(PANIC,
1963  errmsg("could not fsync file \"%s\": %m",
1964  path)));
1966 
1967  /* Also sync the parent directory */
1969  fsync_fname(slotdir, true);
1970  END_CRIT_SECTION();
1971 
1972  /* read part of statefile that's guaranteed to be version independent */
1973  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1974  readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1976  if (readBytes != ReplicationSlotOnDiskConstantSize)
1977  {
1978  if (readBytes < 0)
1979  ereport(PANIC,
1981  errmsg("could not read file \"%s\": %m", path)));
1982  else
1983  ereport(PANIC,
1985  errmsg("could not read file \"%s\": read %d of %zu",
1986  path, readBytes,
1988  }
1989 
1990  /* verify magic */
1991  if (cp.magic != SLOT_MAGIC)
1992  ereport(PANIC,
1994  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1995  path, cp.magic, SLOT_MAGIC)));
1996 
1997  /* verify version */
1998  if (cp.version != SLOT_VERSION)
1999  ereport(PANIC,
2001  errmsg("replication slot file \"%s\" has unsupported version %u",
2002  path, cp.version)));
2003 
2004  /* boundary check on length */
2006  ereport(PANIC,
2008  errmsg("replication slot file \"%s\" has corrupted length %u",
2009  path, cp.length)));
2010 
2011  /* Now that we know the size, read the entire file */
2012  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2013  readBytes = read(fd,
2014  (char *) &cp + ReplicationSlotOnDiskConstantSize,
2015  cp.length);
2017  if (readBytes != cp.length)
2018  {
2019  if (readBytes < 0)
2020  ereport(PANIC,
2022  errmsg("could not read file \"%s\": %m", path)));
2023  else
2024  ereport(PANIC,
2026  errmsg("could not read file \"%s\": read %d of %zu",
2027  path, readBytes, (Size) cp.length)));
2028  }
2029 
2030  if (CloseTransientFile(fd) != 0)
2031  ereport(PANIC,
2033  errmsg("could not close file \"%s\": %m", path)));
2034 
2035  /* now verify the CRC */
2036  INIT_CRC32C(checksum);
2037  COMP_CRC32C(checksum,
2040  FIN_CRC32C(checksum);
2041 
2042  if (!EQ_CRC32C(checksum, cp.checksum))
2043  ereport(PANIC,
2044  (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2045  path, checksum, cp.checksum)));
2046 
2047  /*
2048  * If we crashed with an ephemeral slot active, don't restore but delete
2049  * it.
2050  */
2052  {
2053  if (!rmtree(slotdir, true))
2054  {
2055  ereport(WARNING,
2056  (errmsg("could not remove directory \"%s\"",
2057  slotdir)));
2058  }
2059  fsync_fname("pg_replslot", true);
2060  return;
2061  }
2062 
2063  /*
2064  * Verify that requirements for the specific slot type are met. That's
2065  * important because if these aren't met we're not guaranteed to retain
2066  * all the necessary resources for the slot.
2067  *
2068  * NB: We have to do so *after* the above checks for ephemeral slots,
2069  * because otherwise a slot that shouldn't exist anymore could prevent
2070  * restarts.
2071  *
2072  * NB: Changing the requirements here also requires adapting
2073  * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2074  */
2076  ereport(FATAL,
2077  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2078  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2079  NameStr(cp.slotdata.name)),
2080  errhint("Change wal_level to be logical or higher.")));
2081  else if (wal_level < WAL_LEVEL_REPLICA)
2082  ereport(FATAL,
2083  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2084  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2085  NameStr(cp.slotdata.name)),
2086  errhint("Change wal_level to be replica or higher.")));
2087 
2088  /* nothing can be active yet, don't lock anything */
2089  for (i = 0; i < max_replication_slots; i++)
2090  {
2091  ReplicationSlot *slot;
2092 
2094 
2095  if (slot->in_use)
2096  continue;
2097 
2098  /* restore the entire set of persistent data */
2099  memcpy(&slot->data, &cp.slotdata,
2101 
2102  /* initialize in memory state */
2103  slot->effective_xmin = cp.slotdata.xmin;
2106 
2111 
2112  slot->in_use = true;
2113  slot->active_pid = 0;
2114 
2115  restored = true;
2116  break;
2117  }
2118 
2119  if (!restored)
2120  ereport(FATAL,
2121  (errmsg("too many replication slots active before shutdown"),
2122  errhint("Increase max_replication_slots and try again.")));
2123 }
#define InvalidBackendId
Definition: backendid.h:23
#define NameStr(name)
Definition: c.h:735
unsigned int uint32
Definition: c.h:495
#define ngettext(s, p, n)
Definition: c.h:1194
#define PG_BINARY
Definition: c.h:1283
#define pg_unreachable()
Definition: c.h:285
#define MemSet(start, val, len)
Definition: c.h:1009
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2854
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3858
int FreeDir(DIR *dir)
Definition: fd.c:2906
int CloseTransientFile(int fd)
Definition: fd.c:2754
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:708
int pg_fsync(int fd)
Definition: fd.c:361
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2578
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2788
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:525
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
int MyProcPid
Definition: globals.c:44
bool IsUnderPostmaster
Definition: globals.c:113
Oid MyDatabaseId
Definition: globals.c:89
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1920
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1964
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:191
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void pfree(void *pointer)
Definition: mcxt.c:1456
#define START_CRIT_SECTION()
Definition: miscadmin.h:148
@ B_STARTUP
Definition: miscadmin.h:338
#define END_CRIT_SECTION()
Definition: miscadmin.h:150
Oid GetUserId(void)
Definition: miscinit.c:509
BackendType MyBackendType
Definition: miscinit.c:63
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:706
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
static bool two_phase
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_IN_LOGICAL_DECODING
Definition: proc.h:60
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3847
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:376
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:409
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:86
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1582
void ReplicationSlotCleanup(void)
Definition: slot.c:605
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1175
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1000
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:452
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1526
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1058
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:83
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:942
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:1918
void ReplicationSlotPersist(void)
Definition: slot.c:815
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:1763
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:643
void ReplicationSlotSave(void)
Definition: slot.c:780
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:1702
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:89
void CheckSlotPermissions(void)
Definition: slot.c:1158
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:425
void ReplicationSlotsShmemInit(void)
Definition: slot.c:136
void ReplicationSlotRelease(void)
Definition: slot.c:549
int max_replication_slots
Definition: slot.c:102
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:96
#define SLOT_VERSION
Definition: slot.c:93
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:893
void ReplicationSlotInitialize(void)
Definition: slot.c:171
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:673
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1317
void StartupReplicationSlots(void)
Definition: slot.c:1641
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:253
void CheckSlotRequirements(void)
Definition: slot.c:1136
#define SLOT_MAGIC
Definition: slot.c:92
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
Definition: slot.c:1251
static void ReplicationSlotDropAcquired(void)
Definition: slot.c:656
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:80
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:118
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:199
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:180
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsPhysical(slot)
Definition: slot.h:190
ReplicationSlotInvalidationCause
Definition: slot.h:45
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:48
@ RS_INVAL_HORIZON
Definition: slot.h:50
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:52
@ RS_INVAL_NONE
Definition: slot.h:46
#define SlotIsLogical(slot)
Definition: slot.h:191
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1287
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:32
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: dirent.c:26
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:377
ReplicationSlot replication_slots[1]
Definition: slot.h:202
uint32 version
Definition: slot.c:68
ReplicationSlotPersistentData slotdata
Definition: slot.c:76
pg_crc32c checksum
Definition: slot.c:65
TransactionId xmin
Definition: slot.h:77
TransactionId catalog_xmin
Definition: slot.h:85
XLogRecPtr restart_lsn
Definition: slot.h:88
XLogRecPtr confirmed_flush
Definition: slot.h:99
ReplicationSlotPersistency persistency
Definition: slot.h:69
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:91
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:178
TransactionId effective_catalog_xmin
Definition: slot.h:159
slock_t mutex
Definition: slot.h:135
XLogRecPtr candidate_restart_valid
Definition: slot.h:179
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:187
pid_t active_pid
Definition: slot.h:141
bool in_use
Definition: slot.h:138
TransactionId effective_xmin
Definition: slot.h:158
bool just_dirtied
Definition: slot.h:144
XLogRecPtr candidate_restart_lsn
Definition: slot.h:180
LWLock io_in_progress_lock
Definition: slot.h:165
ConditionVariable active_cv
Definition: slot.h:168
TransactionId candidate_catalog_xmin
Definition: slot.h:177
bool dirty
Definition: slot.h:145
ReplicationSlotPersistentData data
Definition: slot.h:162
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:95
Definition: c.h:730
unsigned short st_mode
Definition: win32_port.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
const char * name
#define stat
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:325
#define kill(pid, sig)
Definition: win32_port.h:485
bool RecoveryInProgress(void)
Definition: xlog.c:5948
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3491
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6051
int wal_level
Definition: xlog.c:134
int wal_segment_size
Definition: xlog.c:146
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2421
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:8923
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2535
@ WAL_LEVEL_REPLICA
Definition: xlog.h:70
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:71
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)