PostgreSQL Source Code  git master
slot.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * slot.c
4  * Replication slot management.
5  *
6  *
7  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/slot.c
12  *
13  * NOTES
14  *
15  * Replication slots are used to keep state about replication streams
16  * originating from this cluster. Their primary purpose is to prevent the
17  * premature removal of WAL or of old tuple versions in a manner that would
18  * interfere with replication; they are also useful for monitoring purposes.
19  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20  * on standbys (to support cascading setups). The requirement that slots be
21  * usable on standbys precludes storing them in the system catalogs.
22  *
23  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24  * directory. Inside that directory the state file will contain the slot's
25  * own data. Additional data can be stored alongside that file if required.
26  * While the server is running, the state data is also cached in memory for
27  * efficiency.
28  *
29  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31  * to iterate over the slots, and in exclusive mode to change the in_use flag
32  * of a slot. The remaining data in each slot is protected by its mutex.
33  *
34  *-------------------------------------------------------------------------
35  */
36 
37 #include "postgres.h"
38 
39 #include <unistd.h>
40 #include <sys/stat.h>
41 
42 #include "access/transam.h"
43 #include "access/xlog_internal.h"
44 #include "access/xlogrecovery.h"
45 #include "common/file_utils.h"
46 #include "common/string.h"
47 #include "miscadmin.h"
48 #include "pgstat.h"
49 #include "replication/slotsync.h"
50 #include "replication/slot.h"
51 #include "storage/fd.h"
52 #include "storage/ipc.h"
53 #include "storage/proc.h"
54 #include "storage/procarray.h"
55 #include "utils/builtins.h"
56 
57 /*
58  * Replication slot on-disk data structure.
59  */
60 typedef struct ReplicationSlotOnDisk
61 {
62  /* first part of this struct needs to be version independent */
63 
64  /* data not covered by checksum */
67 
68  /* data covered by checksum */
71 
72  /*
73  * The actual data in the slot that follows can differ based on the above
74  * 'version'.
75  */
76 
79 
80 /*
81  * Lookup table for slot invalidation causes.
82  */
83 const char *const SlotInvalidationCauses[] = {
84  [RS_INVAL_NONE] = "none",
85  [RS_INVAL_WAL_REMOVED] = "wal_removed",
86  [RS_INVAL_HORIZON] = "rows_removed",
87  [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
88 };
89 
90 /* Maximum number of invalidation causes */
91 #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
92 
94  "array length mismatch");
95 
96 /* size of version independent data */
97 #define ReplicationSlotOnDiskConstantSize \
98  offsetof(ReplicationSlotOnDisk, slotdata)
99 /* size of the part of the slot not covered by the checksum */
100 #define ReplicationSlotOnDiskNotChecksummedSize \
101  offsetof(ReplicationSlotOnDisk, version)
102 /* size of the part covered by the checksum */
103 #define ReplicationSlotOnDiskChecksummedSize \
104  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
105 /* size of the slot data that is version dependent */
106 #define ReplicationSlotOnDiskV2Size \
107  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
108 
109 #define SLOT_MAGIC 0x1051CA1 /* format identifier */
110 #define SLOT_VERSION 5 /* version for new files */
111 
112 /* Control array for replication slot management */
114 
115 /* My backend's replication slot in the shared memory array */
117 
118 /* GUC variable */
119 int max_replication_slots = 10; /* the maximum number of replication
120  * slots */
121 
122 static void ReplicationSlotShmemExit(int code, Datum arg);
123 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
124 
125 /* internal persistency functions */
126 static void RestoreSlotFromDisk(const char *name);
127 static void CreateSlotOnDisk(ReplicationSlot *slot);
128 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
129 
130 /*
131  * Report shared-memory space needed by ReplicationSlotsShmemInit.
132  */
133 Size
135 {
136  Size size = 0;
137 
138  if (max_replication_slots == 0)
139  return size;
140 
141  size = offsetof(ReplicationSlotCtlData, replication_slots);
142  size = add_size(size,
144 
145  return size;
146 }
147 
148 /*
149  * Allocate and initialize shared memory for replication slots.
150  */
151 void
153 {
154  bool found;
155 
156  if (max_replication_slots == 0)
157  return;
158 
160  ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
161  &found);
162 
163  if (!found)
164  {
165  int i;
166 
167  /* First time through, so initialize */
169 
170  for (i = 0; i < max_replication_slots; i++)
171  {
173 
174  /* everything else is zeroed by the memset above */
175  SpinLockInit(&slot->mutex);
179  }
180  }
181 }
182 
183 /*
184  * Register the callback for replication slot cleanup and releasing.
185  */
186 void
188 {
190 }
191 
192 /*
193  * Release and cleanup replication slots.
194  */
195 static void
197 {
198  /* Make sure active replication slots are released */
199  if (MyReplicationSlot != NULL)
201 
202  /* Also cleanup all the temporary slots. */
204 }
205 
206 /*
207  * Check whether the passed slot name is valid and report errors at elevel.
208  *
209  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
210  * the name to be used as a directory name on every supported OS.
211  *
212  * Returns whether the directory name is valid or not if elevel < ERROR.
213  */
214 bool
215 ReplicationSlotValidateName(const char *name, int elevel)
216 {
217  const char *cp;
218 
219  if (strlen(name) == 0)
220  {
221  ereport(elevel,
222  (errcode(ERRCODE_INVALID_NAME),
223  errmsg("replication slot name \"%s\" is too short",
224  name)));
225  return false;
226  }
227 
228  if (strlen(name) >= NAMEDATALEN)
229  {
230  ereport(elevel,
231  (errcode(ERRCODE_NAME_TOO_LONG),
232  errmsg("replication slot name \"%s\" is too long",
233  name)));
234  return false;
235  }
236 
237  for (cp = name; *cp; cp++)
238  {
239  if (!((*cp >= 'a' && *cp <= 'z')
240  || (*cp >= '0' && *cp <= '9')
241  || (*cp == '_')))
242  {
243  ereport(elevel,
244  (errcode(ERRCODE_INVALID_NAME),
245  errmsg("replication slot name \"%s\" contains invalid character",
246  name),
247  errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
248  return false;
249  }
250  }
251  return true;
252 }
253 
254 /*
255  * Create a new replication slot and mark it as used by this backend.
256  *
257  * name: Name of the slot
258  * db_specific: logical decoding is db specific; if the slot is going to
259  * be used for that pass true, otherwise false.
260  * two_phase: Allows decoding of prepared transactions. We allow this option
261  * to be enabled only at the slot creation time. If we allow this option
262  * to be changed during decoding then it is quite possible that we skip
263  * prepare first time because this option was not enabled. Now next time
264  * during getting changes, if the two_phase option is enabled it can skip
265  * prepare because by that time start decoding point has been moved. So the
266  * user will only get commit prepared.
267  * failover: If enabled, allows the slot to be synced to standbys so
268  * that logical replication can be resumed after failover.
269  * synced: True if the slot is synchronized from the primary server.
270  */
271 void
272 ReplicationSlotCreate(const char *name, bool db_specific,
273  ReplicationSlotPersistency persistency,
274  bool two_phase, bool failover, bool synced)
275 {
276  ReplicationSlot *slot = NULL;
277  int i;
278 
279  Assert(MyReplicationSlot == NULL);
280 
282 
283  if (failover)
284  {
285  /*
286  * Do not allow users to create the failover enabled slots on the
287  * standby as we do not support sync to the cascading standby.
288  *
289  * However, failover enabled slots can be created during slot
290  * synchronization because we need to retain the same values as the
291  * remote slot.
292  */
294  ereport(ERROR,
295  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
296  errmsg("cannot enable failover for a replication slot created on the standby"));
297 
298  /*
299  * Do not allow users to create failover enabled temporary slots,
300  * because temporary slots will not be synced to the standby.
301  *
302  * However, failover enabled temporary slots can be created during
303  * slot synchronization. See the comments atop slotsync.c for details.
304  */
305  if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
306  ereport(ERROR,
307  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
308  errmsg("cannot enable failover for a temporary replication slot"));
309  }
310 
311  /*
312  * If some other backend ran this code concurrently with us, we'd likely
313  * both allocate the same slot, and that would be bad. We'd also be at
314  * risk of missing a name collision. Also, we don't want to try to create
315  * a new slot while somebody's busy cleaning up an old one, because we
316  * might both be monkeying with the same directory.
317  */
318  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
319 
320  /*
321  * Check for name collision, and identify an allocatable slot. We need to
322  * hold ReplicationSlotControlLock in shared mode for this, so that nobody
323  * else can change the in_use flags while we're looking at them.
324  */
325  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
326  for (i = 0; i < max_replication_slots; i++)
327  {
329 
330  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
331  ereport(ERROR,
333  errmsg("replication slot \"%s\" already exists", name)));
334  if (!s->in_use && slot == NULL)
335  slot = s;
336  }
337  LWLockRelease(ReplicationSlotControlLock);
338 
339  /* If all slots are in use, we're out of luck. */
340  if (slot == NULL)
341  ereport(ERROR,
342  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
343  errmsg("all replication slots are in use"),
344  errhint("Free one or increase max_replication_slots.")));
345 
346  /*
347  * Since this slot is not in use, nobody should be looking at any part of
348  * it other than the in_use field unless they're trying to allocate it.
349  * And since we hold ReplicationSlotAllocationLock, nobody except us can
350  * be doing that. So it's safe to initialize the slot.
351  */
352  Assert(!slot->in_use);
353  Assert(slot->active_pid == 0);
354 
355  /* first initialize persistent data */
356  memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
357  namestrcpy(&slot->data.name, name);
358  slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
359  slot->data.persistency = persistency;
360  slot->data.two_phase = two_phase;
362  slot->data.failover = failover;
363  slot->data.synced = synced;
364 
365  /* and then data only present in shared memory */
366  slot->just_dirtied = false;
367  slot->dirty = false;
375 
376  /*
377  * Create the slot on disk. We haven't actually marked the slot allocated
378  * yet, so no special cleanup is required if this errors out.
379  */
380  CreateSlotOnDisk(slot);
381 
382  /*
383  * We need to briefly prevent any other backend from iterating over the
384  * slots while we flip the in_use flag. We also need to set the active
385  * flag while holding the ControlLock as otherwise a concurrent
386  * ReplicationSlotAcquire() could acquire the slot as well.
387  */
388  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
389 
390  slot->in_use = true;
391 
392  /* We can now mark the slot active, and that makes it our slot. */
393  SpinLockAcquire(&slot->mutex);
394  Assert(slot->active_pid == 0);
395  slot->active_pid = MyProcPid;
396  SpinLockRelease(&slot->mutex);
397  MyReplicationSlot = slot;
398 
399  LWLockRelease(ReplicationSlotControlLock);
400 
401  /*
402  * Create statistics entry for the new logical slot. We don't collect any
403  * stats for physical slots, so no need to create an entry for the same.
404  * See ReplicationSlotDropPtr for why we need to do this before releasing
405  * ReplicationSlotAllocationLock.
406  */
407  if (SlotIsLogical(slot))
409 
410  /*
411  * Now that the slot has been marked as in_use and active, it's safe to
412  * let somebody else try to allocate a slot.
413  */
414  LWLockRelease(ReplicationSlotAllocationLock);
415 
416  /* Let everybody know we've modified this slot */
418 }
419 
420 /*
421  * Search for the named replication slot.
422  *
423  * Return the replication slot if found, otherwise NULL.
424  */
426 SearchNamedReplicationSlot(const char *name, bool need_lock)
427 {
428  int i;
429  ReplicationSlot *slot = NULL;
430 
431  if (need_lock)
432  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
433 
434  for (i = 0; i < max_replication_slots; i++)
435  {
437 
438  if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
439  {
440  slot = s;
441  break;
442  }
443  }
444 
445  if (need_lock)
446  LWLockRelease(ReplicationSlotControlLock);
447 
448  return slot;
449 }
450 
451 /*
452  * Return the index of the replication slot in
453  * ReplicationSlotCtl->replication_slots.
454  *
455  * This is mainly useful to have an efficient key for storing replication slot
456  * stats.
457  */
458 int
460 {
462  slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
463 
464  return slot - ReplicationSlotCtl->replication_slots;
465 }
466 
467 /*
468  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
469  * the slot's name and true is returned.
470  *
471  * This likely is only useful for pgstat_replslot.c during shutdown, in other
472  * cases there are obvious TOCTOU issues.
473  */
474 bool
476 {
477  ReplicationSlot *slot;
478  bool found;
479 
481 
482  /*
483  * Ensure that the slot cannot be dropped while we copy the name. Don't
484  * need the spinlock as the name of an existing slot cannot change.
485  */
486  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487  found = slot->in_use;
488  if (slot->in_use)
489  namestrcpy(name, NameStr(slot->data.name));
490  LWLockRelease(ReplicationSlotControlLock);
491 
492  return found;
493 }
494 
495 /*
496  * Find a previously created slot and mark it as used by this process.
497  *
498  * An error is raised if nowait is true and the slot is currently in use. If
499  * nowait is false, we sleep until the slot is released by the owning process.
500  */
501 void
502 ReplicationSlotAcquire(const char *name, bool nowait)
503 {
504  ReplicationSlot *s;
505  int active_pid;
506 
507  Assert(name != NULL);
508 
509 retry:
510  Assert(MyReplicationSlot == NULL);
511 
512  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
513 
514  /* Check if the slot exits with the given name. */
515  s = SearchNamedReplicationSlot(name, false);
516  if (s == NULL || !s->in_use)
517  {
518  LWLockRelease(ReplicationSlotControlLock);
519 
520  ereport(ERROR,
521  (errcode(ERRCODE_UNDEFINED_OBJECT),
522  errmsg("replication slot \"%s\" does not exist",
523  name)));
524  }
525 
526  /*
527  * This is the slot we want; check if it's active under some other
528  * process. In single user mode, we don't need this check.
529  */
530  if (IsUnderPostmaster)
531  {
532  /*
533  * Get ready to sleep on the slot in case it is active. (We may end
534  * up not sleeping, but we don't want to do this while holding the
535  * spinlock.)
536  */
537  if (!nowait)
539 
540  SpinLockAcquire(&s->mutex);
541  if (s->active_pid == 0)
542  s->active_pid = MyProcPid;
543  active_pid = s->active_pid;
544  SpinLockRelease(&s->mutex);
545  }
546  else
547  active_pid = MyProcPid;
548  LWLockRelease(ReplicationSlotControlLock);
549 
550  /*
551  * If we found the slot but it's already active in another process, we
552  * wait until the owning process signals us that it's been released, or
553  * error out.
554  */
555  if (active_pid != MyProcPid)
556  {
557  if (!nowait)
558  {
559  /* Wait here until we get signaled, and then restart */
561  WAIT_EVENT_REPLICATION_SLOT_DROP);
563  goto retry;
564  }
565 
566  ereport(ERROR,
567  (errcode(ERRCODE_OBJECT_IN_USE),
568  errmsg("replication slot \"%s\" is active for PID %d",
569  NameStr(s->data.name), active_pid)));
570  }
571  else if (!nowait)
572  ConditionVariableCancelSleep(); /* no sleep needed after all */
573 
574  /* Let everybody know we've modified this slot */
576 
577  /* We made this slot active, so it's ours now. */
578  MyReplicationSlot = s;
579 
580  /*
581  * The call to pgstat_acquire_replslot() protects against stats for a
582  * different slot, from before a restart or such, being present during
583  * pgstat_report_replslot().
584  */
585  if (SlotIsLogical(s))
587 
588  if (am_walsender)
589  {
591  SlotIsLogical(s)
592  ? errmsg("acquired logical replication slot \"%s\"",
593  NameStr(s->data.name))
594  : errmsg("acquired physical replication slot \"%s\"",
595  NameStr(s->data.name)));
596  }
597 }
598 
599 /*
600  * Release the replication slot that this backend considers to own.
601  *
602  * This or another backend can re-acquire the slot later.
603  * Resources this slot requires will be preserved.
604  */
605 void
607 {
609  char *slotname = NULL; /* keep compiler quiet */
610  bool is_logical = false; /* keep compiler quiet */
611 
612  Assert(slot != NULL && slot->active_pid != 0);
613 
614  if (am_walsender)
615  {
616  slotname = pstrdup(NameStr(slot->data.name));
617  is_logical = SlotIsLogical(slot);
618  }
619 
620  if (slot->data.persistency == RS_EPHEMERAL)
621  {
622  /*
623  * Delete the slot. There is no !PANIC case where this is allowed to
624  * fail, all that may happen is an incomplete cleanup of the on-disk
625  * data.
626  */
628  }
629 
630  /*
631  * If slot needed to temporarily restrain both data and catalog xmin to
632  * create the catalog snapshot, remove that temporary constraint.
633  * Snapshots can only be exported while the initial snapshot is still
634  * acquired.
635  */
636  if (!TransactionIdIsValid(slot->data.xmin) &&
638  {
639  SpinLockAcquire(&slot->mutex);
641  SpinLockRelease(&slot->mutex);
643  }
644 
645  if (slot->data.persistency == RS_PERSISTENT)
646  {
647  /*
648  * Mark persistent slot inactive. We're not freeing it, just
649  * disconnecting, but wake up others that may be waiting for it.
650  */
651  SpinLockAcquire(&slot->mutex);
652  slot->active_pid = 0;
653  SpinLockRelease(&slot->mutex);
655  }
656 
657  MyReplicationSlot = NULL;
658 
659  /* might not have been set when we've been a plain slot */
660  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
663  LWLockRelease(ProcArrayLock);
664 
665  if (am_walsender)
666  {
668  is_logical
669  ? errmsg("released logical replication slot \"%s\"",
670  slotname)
671  : errmsg("released physical replication slot \"%s\"",
672  slotname));
673 
674  pfree(slotname);
675  }
676 }
677 
678 /*
679  * Cleanup all temporary slots created in current session.
680  */
681 void
683 {
684  int i;
685 
686  Assert(MyReplicationSlot == NULL);
687 
688 restart:
689  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
690  for (i = 0; i < max_replication_slots; i++)
691  {
693 
694  if (!s->in_use)
695  continue;
696 
697  SpinLockAcquire(&s->mutex);
698  if (s->active_pid == MyProcPid)
699  {
701  SpinLockRelease(&s->mutex);
702  LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
703 
705 
707  goto restart;
708  }
709  else
710  SpinLockRelease(&s->mutex);
711  }
712 
713  LWLockRelease(ReplicationSlotControlLock);
714 }
715 
716 /*
717  * Permanently drop replication slot identified by the passed in name.
718  */
719 void
720 ReplicationSlotDrop(const char *name, bool nowait)
721 {
722  Assert(MyReplicationSlot == NULL);
723 
724  ReplicationSlotAcquire(name, nowait);
725 
726  /*
727  * Do not allow users to drop the slots which are currently being synced
728  * from the primary to the standby.
729  */
731  ereport(ERROR,
732  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
733  errmsg("cannot drop replication slot \"%s\"", name),
734  errdetail("This slot is being synced from the primary server."));
735 
737 }
738 
739 /*
740  * Change the definition of the slot identified by the specified name.
741  */
742 void
743 ReplicationSlotAlter(const char *name, bool failover)
744 {
745  Assert(MyReplicationSlot == NULL);
746 
748 
750  ereport(ERROR,
751  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
752  errmsg("cannot use %s with a physical replication slot",
753  "ALTER_REPLICATION_SLOT"));
754 
755  if (RecoveryInProgress())
756  {
757  /*
758  * Do not allow users to alter the slots which are currently being
759  * synced from the primary to the standby.
760  */
762  ereport(ERROR,
763  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
764  errmsg("cannot alter replication slot \"%s\"", name),
765  errdetail("This slot is being synced from the primary server."));
766 
767  /*
768  * Do not allow users to enable failover on the standby as we do not
769  * support sync to the cascading standby.
770  */
771  if (failover)
772  ereport(ERROR,
773  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
774  errmsg("cannot enable failover for a replication slot"
775  " on the standby"));
776  }
777 
778  /*
779  * Do not allow users to enable failover for temporary slots as we do not
780  * support syncing temporary slots to the standby.
781  */
782  if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
783  ereport(ERROR,
784  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
785  errmsg("cannot enable failover for a temporary replication slot"));
786 
787  if (MyReplicationSlot->data.failover != failover)
788  {
790  MyReplicationSlot->data.failover = failover;
792 
795  }
796 
798 }
799 
800 /*
801  * Permanently drop the currently acquired replication slot.
802  */
803 void
805 {
807 
808  Assert(MyReplicationSlot != NULL);
809 
810  /* slot isn't acquired anymore */
811  MyReplicationSlot = NULL;
812 
814 }
815 
816 /*
817  * Permanently drop the replication slot which will be released by the point
818  * this function returns.
819  */
820 static void
822 {
823  char path[MAXPGPATH];
824  char tmppath[MAXPGPATH];
825 
826  /*
827  * If some other backend ran this code concurrently with us, we might try
828  * to delete a slot with a certain name while someone else was trying to
829  * create a slot with the same name.
830  */
831  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
832 
833  /* Generate pathnames. */
834  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
835  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
836 
837  /*
838  * Rename the slot directory on disk, so that we'll no longer recognize
839  * this as a valid slot. Note that if this fails, we've got to mark the
840  * slot inactive before bailing out. If we're dropping an ephemeral or a
841  * temporary slot, we better never fail hard as the caller won't expect
842  * the slot to survive and this might get called during error handling.
843  */
844  if (rename(path, tmppath) == 0)
845  {
846  /*
847  * We need to fsync() the directory we just renamed and its parent to
848  * make sure that our changes are on disk in a crash-safe fashion. If
849  * fsync() fails, we can't be sure whether the changes are on disk or
850  * not. For now, we handle that by panicking;
851  * StartupReplicationSlots() will try to straighten it out after
852  * restart.
853  */
855  fsync_fname(tmppath, true);
856  fsync_fname("pg_replslot", true);
858  }
859  else
860  {
861  bool fail_softly = slot->data.persistency != RS_PERSISTENT;
862 
863  SpinLockAcquire(&slot->mutex);
864  slot->active_pid = 0;
865  SpinLockRelease(&slot->mutex);
866 
867  /* wake up anyone waiting on this slot */
869 
870  ereport(fail_softly ? WARNING : ERROR,
872  errmsg("could not rename file \"%s\" to \"%s\": %m",
873  path, tmppath)));
874  }
875 
876  /*
877  * The slot is definitely gone. Lock out concurrent scans of the array
878  * long enough to kill it. It's OK to clear the active PID here without
879  * grabbing the mutex because nobody else can be scanning the array here,
880  * and nobody can be attached to this slot and thus access it without
881  * scanning the array.
882  *
883  * Also wake up processes waiting for it.
884  */
885  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
886  slot->active_pid = 0;
887  slot->in_use = false;
888  LWLockRelease(ReplicationSlotControlLock);
890 
891  /*
892  * Slot is dead and doesn't prevent resource removal anymore, recompute
893  * limits.
894  */
897 
898  /*
899  * If removing the directory fails, the worst thing that will happen is
900  * that the user won't be able to create a new slot with the same name
901  * until the next server restart. We warn about it, but that's all.
902  */
903  if (!rmtree(tmppath, true))
905  (errmsg("could not remove directory \"%s\"", tmppath)));
906 
907  /*
908  * Drop the statistics entry for the replication slot. Do this while
909  * holding ReplicationSlotAllocationLock so that we don't drop a
910  * statistics entry for another slot with the same name just created in
911  * another session.
912  */
913  if (SlotIsLogical(slot))
914  pgstat_drop_replslot(slot);
915 
916  /*
917  * We release this at the very end, so that nobody starts trying to create
918  * a slot while we're still cleaning up the detritus of the old one.
919  */
920  LWLockRelease(ReplicationSlotAllocationLock);
921 }
922 
923 /*
924  * Serialize the currently acquired slot's state from memory to disk, thereby
925  * guaranteeing the current state will survive a crash.
926  */
927 void
929 {
930  char path[MAXPGPATH];
931 
932  Assert(MyReplicationSlot != NULL);
933 
934  sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
936 }
937 
938 /*
939  * Signal that it would be useful if the currently acquired slot would be
940  * flushed out to disk.
941  *
942  * Note that the actual flush to disk can be delayed for a long time, if
943  * required for correctness explicitly do a ReplicationSlotSave().
944  */
945 void
947 {
949 
950  Assert(MyReplicationSlot != NULL);
951 
952  SpinLockAcquire(&slot->mutex);
954  MyReplicationSlot->dirty = true;
955  SpinLockRelease(&slot->mutex);
956 }
957 
958 /*
959  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
960  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
961  */
962 void
964 {
966 
967  Assert(slot != NULL);
969 
970  SpinLockAcquire(&slot->mutex);
972  SpinLockRelease(&slot->mutex);
973 
976 }
977 
978 /*
979  * Compute the oldest xmin across all slots and store it in the ProcArray.
980  *
981  * If already_locked is true, ProcArrayLock has already been acquired
982  * exclusively.
983  */
984 void
986 {
987  int i;
989  TransactionId agg_catalog_xmin = InvalidTransactionId;
990 
991  Assert(ReplicationSlotCtl != NULL);
992 
993  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
994 
995  for (i = 0; i < max_replication_slots; i++)
996  {
998  TransactionId effective_xmin;
999  TransactionId effective_catalog_xmin;
1000  bool invalidated;
1001 
1002  if (!s->in_use)
1003  continue;
1004 
1005  SpinLockAcquire(&s->mutex);
1006  effective_xmin = s->effective_xmin;
1007  effective_catalog_xmin = s->effective_catalog_xmin;
1008  invalidated = s->data.invalidated != RS_INVAL_NONE;
1009  SpinLockRelease(&s->mutex);
1010 
1011  /* invalidated slots need not apply */
1012  if (invalidated)
1013  continue;
1014 
1015  /* check the data xmin */
1016  if (TransactionIdIsValid(effective_xmin) &&
1017  (!TransactionIdIsValid(agg_xmin) ||
1018  TransactionIdPrecedes(effective_xmin, agg_xmin)))
1019  agg_xmin = effective_xmin;
1020 
1021  /* check the catalog xmin */
1022  if (TransactionIdIsValid(effective_catalog_xmin) &&
1023  (!TransactionIdIsValid(agg_catalog_xmin) ||
1024  TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1025  agg_catalog_xmin = effective_catalog_xmin;
1026  }
1027 
1028  LWLockRelease(ReplicationSlotControlLock);
1029 
1030  ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1031 }
1032 
1033 /*
1034  * Compute the oldest restart LSN across all slots and inform xlog module.
1035  *
1036  * Note: while max_slot_wal_keep_size is theoretically relevant for this
1037  * purpose, we don't try to account for that, because this module doesn't
1038  * know what to compare against.
1039  */
1040 void
1042 {
1043  int i;
1044  XLogRecPtr min_required = InvalidXLogRecPtr;
1045 
1046  Assert(ReplicationSlotCtl != NULL);
1047 
1048  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1049  for (i = 0; i < max_replication_slots; i++)
1050  {
1052  XLogRecPtr restart_lsn;
1053  bool invalidated;
1054 
1055  if (!s->in_use)
1056  continue;
1057 
1058  SpinLockAcquire(&s->mutex);
1059  restart_lsn = s->data.restart_lsn;
1060  invalidated = s->data.invalidated != RS_INVAL_NONE;
1061  SpinLockRelease(&s->mutex);
1062 
1063  /* invalidated slots need not apply */
1064  if (invalidated)
1065  continue;
1066 
1067  if (restart_lsn != InvalidXLogRecPtr &&
1068  (min_required == InvalidXLogRecPtr ||
1069  restart_lsn < min_required))
1070  min_required = restart_lsn;
1071  }
1072  LWLockRelease(ReplicationSlotControlLock);
1073 
1074  XLogSetReplicationSlotMinimumLSN(min_required);
1075 }
1076 
1077 /*
1078  * Compute the oldest WAL LSN required by *logical* decoding slots..
1079  *
1080  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1081  * slots exist.
1082  *
1083  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1084  * ignores physical replication slots.
1085  *
1086  * The results aren't required frequently, so we don't maintain a precomputed
1087  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1088  */
1089 XLogRecPtr
1091 {
1092  XLogRecPtr result = InvalidXLogRecPtr;
1093  int i;
1094 
1095  if (max_replication_slots <= 0)
1096  return InvalidXLogRecPtr;
1097 
1098  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1099 
1100  for (i = 0; i < max_replication_slots; i++)
1101  {
1102  ReplicationSlot *s;
1103  XLogRecPtr restart_lsn;
1104  bool invalidated;
1105 
1107 
1108  /* cannot change while ReplicationSlotCtlLock is held */
1109  if (!s->in_use)
1110  continue;
1111 
1112  /* we're only interested in logical slots */
1113  if (!SlotIsLogical(s))
1114  continue;
1115 
1116  /* read once, it's ok if it increases while we're checking */
1117  SpinLockAcquire(&s->mutex);
1118  restart_lsn = s->data.restart_lsn;
1119  invalidated = s->data.invalidated != RS_INVAL_NONE;
1120  SpinLockRelease(&s->mutex);
1121 
1122  /* invalidated slots need not apply */
1123  if (invalidated)
1124  continue;
1125 
1126  if (restart_lsn == InvalidXLogRecPtr)
1127  continue;
1128 
1129  if (result == InvalidXLogRecPtr ||
1130  restart_lsn < result)
1131  result = restart_lsn;
1132  }
1133 
1134  LWLockRelease(ReplicationSlotControlLock);
1135 
1136  return result;
1137 }
1138 
1139 /*
1140  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1141  * passed database oid.
1142  *
1143  * Returns true if there are any slots referencing the database. *nslots will
1144  * be set to the absolute number of slots in the database, *nactive to ones
1145  * currently active.
1146  */
1147 bool
1148 ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1149 {
1150  int i;
1151 
1152  *nslots = *nactive = 0;
1153 
1154  if (max_replication_slots <= 0)
1155  return false;
1156 
1157  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1158  for (i = 0; i < max_replication_slots; i++)
1159  {
1160  ReplicationSlot *s;
1161 
1163 
1164  /* cannot change while ReplicationSlotCtlLock is held */
1165  if (!s->in_use)
1166  continue;
1167 
1168  /* only logical slots are database specific, skip */
1169  if (!SlotIsLogical(s))
1170  continue;
1171 
1172  /* not our database, skip */
1173  if (s->data.database != dboid)
1174  continue;
1175 
1176  /* NB: intentionally counting invalidated slots */
1177 
1178  /* count slots with spinlock held */
1179  SpinLockAcquire(&s->mutex);
1180  (*nslots)++;
1181  if (s->active_pid != 0)
1182  (*nactive)++;
1183  SpinLockRelease(&s->mutex);
1184  }
1185  LWLockRelease(ReplicationSlotControlLock);
1186 
1187  if (*nslots > 0)
1188  return true;
1189  return false;
1190 }
1191 
1192 /*
1193  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1194  * passed database oid. The caller should hold an exclusive lock on the
1195  * pg_database oid for the database to prevent creation of new slots on the db
1196  * or replay from existing slots.
1197  *
1198  * Another session that concurrently acquires an existing slot on the target DB
1199  * (most likely to drop it) may cause this function to ERROR. If that happens
1200  * it may have dropped some but not all slots.
1201  *
1202  * This routine isn't as efficient as it could be - but we don't drop
1203  * databases often, especially databases with lots of slots.
1204  */
1205 void
1207 {
1208  int i;
1209 
1210  if (max_replication_slots <= 0)
1211  return;
1212 
1213 restart:
1214  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1215  for (i = 0; i < max_replication_slots; i++)
1216  {
1217  ReplicationSlot *s;
1218  char *slotname;
1219  int active_pid;
1220 
1222 
1223  /* cannot change while ReplicationSlotCtlLock is held */
1224  if (!s->in_use)
1225  continue;
1226 
1227  /* only logical slots are database specific, skip */
1228  if (!SlotIsLogical(s))
1229  continue;
1230 
1231  /* not our database, skip */
1232  if (s->data.database != dboid)
1233  continue;
1234 
1235  /* NB: intentionally including invalidated slots */
1236 
1237  /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1238  SpinLockAcquire(&s->mutex);
1239  /* can't change while ReplicationSlotControlLock is held */
1240  slotname = NameStr(s->data.name);
1241  active_pid = s->active_pid;
1242  if (active_pid == 0)
1243  {
1244  MyReplicationSlot = s;
1245  s->active_pid = MyProcPid;
1246  }
1247  SpinLockRelease(&s->mutex);
1248 
1249  /*
1250  * Even though we hold an exclusive lock on the database object a
1251  * logical slot for that DB can still be active, e.g. if it's
1252  * concurrently being dropped by a backend connected to another DB.
1253  *
1254  * That's fairly unlikely in practice, so we'll just bail out.
1255  *
1256  * The slot sync worker holds a shared lock on the database before
1257  * operating on synced logical slots to avoid conflict with the drop
1258  * happening here. The persistent synced slots are thus safe but there
1259  * is a possibility that the slot sync worker has created a temporary
1260  * slot (which stays active even on release) and we are trying to drop
1261  * that here. In practice, the chances of hitting this scenario are
1262  * less as during slot synchronization, the temporary slot is
1263  * immediately converted to persistent and thus is safe due to the
1264  * shared lock taken on the database. So, we'll just bail out in such
1265  * a case.
1266  *
1267  * XXX: We can consider shutting down the slot sync worker before
1268  * trying to drop synced temporary slots here.
1269  */
1270  if (active_pid)
1271  ereport(ERROR,
1272  (errcode(ERRCODE_OBJECT_IN_USE),
1273  errmsg("replication slot \"%s\" is active for PID %d",
1274  slotname, active_pid)));
1275 
1276  /*
1277  * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1278  * holding ReplicationSlotControlLock over filesystem operations,
1279  * release ReplicationSlotControlLock and use
1280  * ReplicationSlotDropAcquired.
1281  *
1282  * As that means the set of slots could change, restart scan from the
1283  * beginning each time we release the lock.
1284  */
1285  LWLockRelease(ReplicationSlotControlLock);
1287  goto restart;
1288  }
1289  LWLockRelease(ReplicationSlotControlLock);
1290 }
1291 
1292 
1293 /*
1294  * Check whether the server's configuration supports using replication
1295  * slots.
1296  */
1297 void
1299 {
1300  /*
1301  * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1302  * needs the same check.
1303  */
1304 
1305  if (max_replication_slots == 0)
1306  ereport(ERROR,
1307  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308  errmsg("replication slots can only be used if max_replication_slots > 0")));
1309 
1311  ereport(ERROR,
1312  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1313  errmsg("replication slots can only be used if wal_level >= replica")));
1314 }
1315 
1316 /*
1317  * Check whether the user has privilege to use replication slots.
1318  */
1319 void
1321 {
1322  if (!has_rolreplication(GetUserId()))
1323  ereport(ERROR,
1324  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1325  errmsg("permission denied to use replication slots"),
1326  errdetail("Only roles with the %s attribute may use replication slots.",
1327  "REPLICATION")));
1328 }
1329 
1330 /*
1331  * Reserve WAL for the currently active slot.
1332  *
1333  * Compute and set restart_lsn in a manner that's appropriate for the type of
1334  * the slot and concurrency safe.
1335  */
1336 void
1338 {
1340 
1341  Assert(slot != NULL);
1343 
1344  /*
1345  * The replication slot mechanism is used to prevent removal of required
1346  * WAL. As there is no interlock between this routine and checkpoints, WAL
1347  * segments could concurrently be removed when a now stale return value of
1348  * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1349  * this happens we'll just retry.
1350  */
1351  while (true)
1352  {
1353  XLogSegNo segno;
1354  XLogRecPtr restart_lsn;
1355 
1356  /*
1357  * For logical slots log a standby snapshot and start logical decoding
1358  * at exactly that position. That allows the slot to start up more
1359  * quickly. But on a standby we cannot do WAL writes, so just use the
1360  * replay pointer; effectively, an attempt to create a logical slot on
1361  * standby will cause it to wait for an xl_running_xact record to be
1362  * logged independently on the primary, so that a snapshot can be
1363  * built using the record.
1364  *
1365  * None of this is needed (or indeed helpful) for physical slots as
1366  * they'll start replay at the last logged checkpoint anyway. Instead
1367  * return the location of the last redo LSN. While that slightly
1368  * increases the chance that we have to retry, it's where a base
1369  * backup has to start replay at.
1370  */
1371  if (SlotIsPhysical(slot))
1372  restart_lsn = GetRedoRecPtr();
1373  else if (RecoveryInProgress())
1374  restart_lsn = GetXLogReplayRecPtr(NULL);
1375  else
1376  restart_lsn = GetXLogInsertRecPtr();
1377 
1378  SpinLockAcquire(&slot->mutex);
1379  slot->data.restart_lsn = restart_lsn;
1380  SpinLockRelease(&slot->mutex);
1381 
1382  /* prevent WAL removal as fast as possible */
1384 
1385  /*
1386  * If all required WAL is still there, great, otherwise retry. The
1387  * slot should prevent further removal of WAL, unless there's a
1388  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1389  * the new restart_lsn above, so normally we should never need to loop
1390  * more than twice.
1391  */
1393  if (XLogGetLastRemovedSegno() < segno)
1394  break;
1395  }
1396 
1397  if (!RecoveryInProgress() && SlotIsLogical(slot))
1398  {
1399  XLogRecPtr flushptr;
1400 
1401  /* make sure we have enough information to start */
1402  flushptr = LogStandbySnapshot();
1403 
1404  /* and make sure it's fsynced to disk */
1405  XLogFlush(flushptr);
1406  }
1407 }
1408 
1409 /*
1410  * Report that replication slot needs to be invalidated
1411  */
1412 static void
1414  bool terminating,
1415  int pid,
1416  NameData slotname,
1417  XLogRecPtr restart_lsn,
1418  XLogRecPtr oldestLSN,
1419  TransactionId snapshotConflictHorizon)
1420 {
1421  StringInfoData err_detail;
1422  bool hint = false;
1423 
1424  initStringInfo(&err_detail);
1425 
1426  switch (cause)
1427  {
1428  case RS_INVAL_WAL_REMOVED:
1429  {
1430  unsigned long long ex = oldestLSN - restart_lsn;
1431 
1432  hint = true;
1433  appendStringInfo(&err_detail,
1434  ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1435  "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1436  ex),
1437  LSN_FORMAT_ARGS(restart_lsn),
1438  ex);
1439  break;
1440  }
1441  case RS_INVAL_HORIZON:
1442  appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1443  snapshotConflictHorizon);
1444  break;
1445 
1446  case RS_INVAL_WAL_LEVEL:
1447  appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
1448  break;
1449  case RS_INVAL_NONE:
1450  pg_unreachable();
1451  }
1452 
1453  ereport(LOG,
1454  terminating ?
1455  errmsg("terminating process %d to release replication slot \"%s\"",
1456  pid, NameStr(slotname)) :
1457  errmsg("invalidating obsolete replication slot \"%s\"",
1458  NameStr(slotname)),
1459  errdetail_internal("%s", err_detail.data),
1460  hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
1461 
1462  pfree(err_detail.data);
1463 }
1464 
1465 /*
1466  * Helper for InvalidateObsoleteReplicationSlots
1467  *
1468  * Acquires the given slot and mark it invalid, if necessary and possible.
1469  *
1470  * Returns whether ReplicationSlotControlLock was released in the interim (and
1471  * in that case we're not holding the lock at return, otherwise we are).
1472  *
1473  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1474  *
1475  * This is inherently racy, because we release the LWLock
1476  * for syscalls, so caller must restart if we return true.
1477  */
1478 static bool
1480  ReplicationSlot *s,
1481  XLogRecPtr oldestLSN,
1482  Oid dboid, TransactionId snapshotConflictHorizon,
1483  bool *invalidated)
1484 {
1485  int last_signaled_pid = 0;
1486  bool released_lock = false;
1487  bool terminated = false;
1488  XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr;
1489  XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr;
1490  XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1492 
1493  for (;;)
1494  {
1495  XLogRecPtr restart_lsn;
1496  NameData slotname;
1497  int active_pid = 0;
1499 
1500  Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1501 
1502  if (!s->in_use)
1503  {
1504  if (released_lock)
1505  LWLockRelease(ReplicationSlotControlLock);
1506  break;
1507  }
1508 
1509  /*
1510  * Check if the slot needs to be invalidated. If it needs to be
1511  * invalidated, and is not currently acquired, acquire it and mark it
1512  * as having been invalidated. We do this with the spinlock held to
1513  * avoid race conditions -- for example the restart_lsn could move
1514  * forward, or the slot could be dropped.
1515  */
1516  SpinLockAcquire(&s->mutex);
1517 
1518  restart_lsn = s->data.restart_lsn;
1519 
1520  /*
1521  * If the slot is already invalid or is a non conflicting slot, we
1522  * don't need to do anything.
1523  */
1524  if (s->data.invalidated == RS_INVAL_NONE)
1525  {
1526  /*
1527  * The slot's mutex will be released soon, and it is possible that
1528  * those values change since the process holding the slot has been
1529  * terminated (if any), so record them here to ensure that we
1530  * would report the correct conflict cause.
1531  */
1532  if (!terminated)
1533  {
1534  initial_restart_lsn = s->data.restart_lsn;
1535  initial_effective_xmin = s->effective_xmin;
1536  initial_catalog_effective_xmin = s->effective_catalog_xmin;
1537  }
1538 
1539  switch (cause)
1540  {
1541  case RS_INVAL_WAL_REMOVED:
1542  if (initial_restart_lsn != InvalidXLogRecPtr &&
1543  initial_restart_lsn < oldestLSN)
1544  conflict = cause;
1545  break;
1546  case RS_INVAL_HORIZON:
1547  if (!SlotIsLogical(s))
1548  break;
1549  /* invalid DB oid signals a shared relation */
1550  if (dboid != InvalidOid && dboid != s->data.database)
1551  break;
1552  if (TransactionIdIsValid(initial_effective_xmin) &&
1553  TransactionIdPrecedesOrEquals(initial_effective_xmin,
1554  snapshotConflictHorizon))
1555  conflict = cause;
1556  else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1557  TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1558  snapshotConflictHorizon))
1559  conflict = cause;
1560  break;
1561  case RS_INVAL_WAL_LEVEL:
1562  if (SlotIsLogical(s))
1563  conflict = cause;
1564  break;
1565  case RS_INVAL_NONE:
1566  pg_unreachable();
1567  }
1568  }
1569 
1570  /*
1571  * The conflict cause recorded previously should not change while the
1572  * process owning the slot (if any) has been terminated.
1573  */
1574  Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
1575  conflict_prev != conflict));
1576 
1577  /* if there's no conflict, we're done */
1578  if (conflict == RS_INVAL_NONE)
1579  {
1580  SpinLockRelease(&s->mutex);
1581  if (released_lock)
1582  LWLockRelease(ReplicationSlotControlLock);
1583  break;
1584  }
1585 
1586  slotname = s->data.name;
1587  active_pid = s->active_pid;
1588 
1589  /*
1590  * If the slot can be acquired, do so and mark it invalidated
1591  * immediately. Otherwise we'll signal the owning process, below, and
1592  * retry.
1593  */
1594  if (active_pid == 0)
1595  {
1596  MyReplicationSlot = s;
1597  s->active_pid = MyProcPid;
1598  s->data.invalidated = conflict;
1599 
1600  /*
1601  * XXX: We should consider not overwriting restart_lsn and instead
1602  * just rely on .invalidated.
1603  */
1604  if (conflict == RS_INVAL_WAL_REMOVED)
1606 
1607  /* Let caller know */
1608  *invalidated = true;
1609  }
1610 
1611  SpinLockRelease(&s->mutex);
1612 
1613  /*
1614  * The logical replication slots shouldn't be invalidated as GUC
1615  * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1616  * check_old_cluster_for_valid_slots() where we ensure that no
1617  * invalidated before the upgrade.
1618  */
1619  Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1620 
1621  if (active_pid != 0)
1622  {
1623  /*
1624  * Prepare the sleep on the slot's condition variable before
1625  * releasing the lock, to close a possible race condition if the
1626  * slot is released before the sleep below.
1627  */
1629 
1630  LWLockRelease(ReplicationSlotControlLock);
1631  released_lock = true;
1632 
1633  /*
1634  * Signal to terminate the process that owns the slot, if we
1635  * haven't already signalled it. (Avoidance of repeated
1636  * signalling is the only reason for there to be a loop in this
1637  * routine; otherwise we could rely on caller's restart loop.)
1638  *
1639  * There is the race condition that other process may own the slot
1640  * after its current owner process is terminated and before this
1641  * process owns it. To handle that, we signal only if the PID of
1642  * the owning process has changed from the previous time. (This
1643  * logic assumes that the same PID is not reused very quickly.)
1644  */
1645  if (last_signaled_pid != active_pid)
1646  {
1647  ReportSlotInvalidation(conflict, true, active_pid,
1648  slotname, restart_lsn,
1649  oldestLSN, snapshotConflictHorizon);
1650 
1651  if (MyBackendType == B_STARTUP)
1652  (void) SendProcSignal(active_pid,
1655  else
1656  (void) kill(active_pid, SIGTERM);
1657 
1658  last_signaled_pid = active_pid;
1659  terminated = true;
1660  conflict_prev = conflict;
1661  }
1662 
1663  /* Wait until the slot is released. */
1665  WAIT_EVENT_REPLICATION_SLOT_DROP);
1666 
1667  /*
1668  * Re-acquire lock and start over; we expect to invalidate the
1669  * slot next time (unless another process acquires the slot in the
1670  * meantime).
1671  */
1672  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1673  continue;
1674  }
1675  else
1676  {
1677  /*
1678  * We hold the slot now and have already invalidated it; flush it
1679  * to ensure that state persists.
1680  *
1681  * Don't want to hold ReplicationSlotControlLock across file
1682  * system operations, so release it now but be sure to tell caller
1683  * to restart from scratch.
1684  */
1685  LWLockRelease(ReplicationSlotControlLock);
1686  released_lock = true;
1687 
1688  /* Make sure the invalidated state persists across server restart */
1693 
1694  ReportSlotInvalidation(conflict, false, active_pid,
1695  slotname, restart_lsn,
1696  oldestLSN, snapshotConflictHorizon);
1697 
1698  /* done with this slot for now */
1699  break;
1700  }
1701  }
1702 
1703  Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1704 
1705  return released_lock;
1706 }
1707 
1708 /*
1709  * Invalidate slots that require resources about to be removed.
1710  *
1711  * Returns true when any slot have got invalidated.
1712  *
1713  * Whether a slot needs to be invalidated depends on the cause. A slot is
1714  * removed if it:
1715  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1716  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1717  * db; dboid may be InvalidOid for shared relations
1718  * - RS_INVAL_WAL_LEVEL: is logical
1719  *
1720  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1721  */
1722 bool
1724  XLogSegNo oldestSegno, Oid dboid,
1725  TransactionId snapshotConflictHorizon)
1726 {
1727  XLogRecPtr oldestLSN;
1728  bool invalidated = false;
1729 
1730  Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1731  Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1732  Assert(cause != RS_INVAL_NONE);
1733 
1734  if (max_replication_slots == 0)
1735  return invalidated;
1736 
1737  XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1738 
1739 restart:
1740  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1741  for (int i = 0; i < max_replication_slots; i++)
1742  {
1744 
1745  if (!s->in_use)
1746  continue;
1747 
1748  if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1749  snapshotConflictHorizon,
1750  &invalidated))
1751  {
1752  /* if the lock was released, start from scratch */
1753  goto restart;
1754  }
1755  }
1756  LWLockRelease(ReplicationSlotControlLock);
1757 
1758  /*
1759  * If any slots have been invalidated, recalculate the resource limits.
1760  */
1761  if (invalidated)
1762  {
1765  }
1766 
1767  return invalidated;
1768 }
1769 
1770 /*
1771  * Flush all replication slots to disk.
1772  *
1773  * It is convenient to flush dirty replication slots at the time of checkpoint.
1774  * Additionally, in case of a shutdown checkpoint, we also identify the slots
1775  * for which the confirmed_flush LSN has been updated since the last time it
1776  * was saved and flush them.
1777  */
1778 void
1780 {
1781  int i;
1782 
1783  elog(DEBUG1, "performing replication slot checkpoint");
1784 
1785  /*
1786  * Prevent any slot from being created/dropped while we're active. As we
1787  * explicitly do *not* want to block iterating over replication_slots or
1788  * acquiring a slot we cannot take the control lock - but that's OK,
1789  * because holding ReplicationSlotAllocationLock is strictly stronger, and
1790  * enough to guarantee that nobody can change the in_use bits on us.
1791  */
1792  LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1793 
1794  for (i = 0; i < max_replication_slots; i++)
1795  {
1797  char path[MAXPGPATH];
1798 
1799  if (!s->in_use)
1800  continue;
1801 
1802  /* save the slot to disk, locking is handled in SaveSlotToPath() */
1803  sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1804 
1805  /*
1806  * Slot's data is not flushed each time the confirmed_flush LSN is
1807  * updated as that could lead to frequent writes. However, we decide
1808  * to force a flush of all logical slot's data at the time of shutdown
1809  * if the confirmed_flush LSN is changed since we last flushed it to
1810  * disk. This helps in avoiding an unnecessary retreat of the
1811  * confirmed_flush LSN after restart.
1812  */
1813  if (is_shutdown && SlotIsLogical(s))
1814  {
1815  SpinLockAcquire(&s->mutex);
1816 
1818 
1819  if (s->data.invalidated == RS_INVAL_NONE &&
1821  {
1822  s->just_dirtied = true;
1823  s->dirty = true;
1824  }
1825  SpinLockRelease(&s->mutex);
1826  }
1827 
1828  SaveSlotToPath(s, path, LOG);
1829  }
1830  LWLockRelease(ReplicationSlotAllocationLock);
1831 }
1832 
1833 /*
1834  * Load all replication slots from disk into memory at server startup. This
1835  * needs to be run before we start crash recovery.
1836  */
1837 void
1839 {
1840  DIR *replication_dir;
1841  struct dirent *replication_de;
1842 
1843  elog(DEBUG1, "starting up replication slots");
1844 
1845  /* restore all slots by iterating over all on-disk entries */
1846  replication_dir = AllocateDir("pg_replslot");
1847  while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1848  {
1849  char path[MAXPGPATH + 12];
1850  PGFileType de_type;
1851 
1852  if (strcmp(replication_de->d_name, ".") == 0 ||
1853  strcmp(replication_de->d_name, "..") == 0)
1854  continue;
1855 
1856  snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1857  de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1858 
1859  /* we're only creating directories here, skip if it's not our's */
1860  if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1861  continue;
1862 
1863  /* we crashed while a slot was being setup or deleted, clean up */
1864  if (pg_str_endswith(replication_de->d_name, ".tmp"))
1865  {
1866  if (!rmtree(path, true))
1867  {
1868  ereport(WARNING,
1869  (errmsg("could not remove directory \"%s\"",
1870  path)));
1871  continue;
1872  }
1873  fsync_fname("pg_replslot", true);
1874  continue;
1875  }
1876 
1877  /* looks like a slot in a normal state, restore */
1878  RestoreSlotFromDisk(replication_de->d_name);
1879  }
1880  FreeDir(replication_dir);
1881 
1882  /* currently no slots exist, we're done. */
1883  if (max_replication_slots <= 0)
1884  return;
1885 
1886  /* Now that we have recovered all the data, compute replication xmin */
1889 }
1890 
1891 /* ----
1892  * Manipulation of on-disk state of replication slots
1893  *
1894  * NB: none of the routines below should take any notice whether a slot is the
1895  * current one or not, that's all handled a layer above.
1896  * ----
1897  */
1898 static void
1900 {
1901  char tmppath[MAXPGPATH];
1902  char path[MAXPGPATH];
1903  struct stat st;
1904 
1905  /*
1906  * No need to take out the io_in_progress_lock, nobody else can see this
1907  * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1908  * takes out the lock, if we'd take the lock here, we'd deadlock.
1909  */
1910 
1911  sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1912  sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1913 
1914  /*
1915  * It's just barely possible that some previous effort to create or drop a
1916  * slot with this name left a temp directory lying around. If that seems
1917  * to be the case, try to remove it. If the rmtree() fails, we'll error
1918  * out at the MakePGDirectory() below, so we don't bother checking
1919  * success.
1920  */
1921  if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1922  rmtree(tmppath, true);
1923 
1924  /* Create and fsync the temporary slot directory. */
1925  if (MakePGDirectory(tmppath) < 0)
1926  ereport(ERROR,
1928  errmsg("could not create directory \"%s\": %m",
1929  tmppath)));
1930  fsync_fname(tmppath, true);
1931 
1932  /* Write the actual state file. */
1933  slot->dirty = true; /* signal that we really need to write */
1934  SaveSlotToPath(slot, tmppath, ERROR);
1935 
1936  /* Rename the directory into place. */
1937  if (rename(tmppath, path) != 0)
1938  ereport(ERROR,
1940  errmsg("could not rename file \"%s\" to \"%s\": %m",
1941  tmppath, path)));
1942 
1943  /*
1944  * If we'd now fail - really unlikely - we wouldn't know whether this slot
1945  * would persist after an OS crash or not - so, force a restart. The
1946  * restart would try to fsync this again till it works.
1947  */
1949 
1950  fsync_fname(path, true);
1951  fsync_fname("pg_replslot", true);
1952 
1953  END_CRIT_SECTION();
1954 }
1955 
1956 /*
1957  * Shared functionality between saving and creating a replication slot.
1958  */
1959 static void
1960 SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1961 {
1962  char tmppath[MAXPGPATH];
1963  char path[MAXPGPATH];
1964  int fd;
1966  bool was_dirty;
1967 
1968  /* first check whether there's something to write out */
1969  SpinLockAcquire(&slot->mutex);
1970  was_dirty = slot->dirty;
1971  slot->just_dirtied = false;
1972  SpinLockRelease(&slot->mutex);
1973 
1974  /* and don't do anything if there's nothing to write */
1975  if (!was_dirty)
1976  return;
1977 
1979 
1980  /* silence valgrind :( */
1981  memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1982 
1983  sprintf(tmppath, "%s/state.tmp", dir);
1984  sprintf(path, "%s/state", dir);
1985 
1986  fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1987  if (fd < 0)
1988  {
1989  /*
1990  * If not an ERROR, then release the lock before returning. In case
1991  * of an ERROR, the error recovery path automatically releases the
1992  * lock, but no harm in explicitly releasing even in that case. Note
1993  * that LWLockRelease() could affect errno.
1994  */
1995  int save_errno = errno;
1996 
1998  errno = save_errno;
1999  ereport(elevel,
2001  errmsg("could not create file \"%s\": %m",
2002  tmppath)));
2003  return;
2004  }
2005 
2006  cp.magic = SLOT_MAGIC;
2007  INIT_CRC32C(cp.checksum);
2008  cp.version = SLOT_VERSION;
2010 
2011  SpinLockAcquire(&slot->mutex);
2012 
2013  memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2014 
2015  SpinLockRelease(&slot->mutex);
2016 
2017  COMP_CRC32C(cp.checksum,
2018  (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2020  FIN_CRC32C(cp.checksum);
2021 
2022  errno = 0;
2023  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2024  if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2025  {
2026  int save_errno = errno;
2027 
2031 
2032  /* if write didn't set errno, assume problem is no disk space */
2033  errno = save_errno ? save_errno : ENOSPC;
2034  ereport(elevel,
2036  errmsg("could not write to file \"%s\": %m",
2037  tmppath)));
2038  return;
2039  }
2041 
2042  /* fsync the temporary file */
2043  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2044  if (pg_fsync(fd) != 0)
2045  {
2046  int save_errno = errno;
2047 
2051  errno = save_errno;
2052  ereport(elevel,
2054  errmsg("could not fsync file \"%s\": %m",
2055  tmppath)));
2056  return;
2057  }
2059 
2060  if (CloseTransientFile(fd) != 0)
2061  {
2062  int save_errno = errno;
2063 
2065  errno = save_errno;
2066  ereport(elevel,
2068  errmsg("could not close file \"%s\": %m",
2069  tmppath)));
2070  return;
2071  }
2072 
2073  /* rename to permanent file, fsync file and directory */
2074  if (rename(tmppath, path) != 0)
2075  {
2076  int save_errno = errno;
2077 
2079  errno = save_errno;
2080  ereport(elevel,
2082  errmsg("could not rename file \"%s\" to \"%s\": %m",
2083  tmppath, path)));
2084  return;
2085  }
2086 
2087  /*
2088  * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2089  */
2091 
2092  fsync_fname(path, false);
2093  fsync_fname(dir, true);
2094  fsync_fname("pg_replslot", true);
2095 
2096  END_CRIT_SECTION();
2097 
2098  /*
2099  * Successfully wrote, unset dirty bit, unless somebody dirtied again
2100  * already and remember the confirmed_flush LSN value.
2101  */
2102  SpinLockAcquire(&slot->mutex);
2103  if (!slot->just_dirtied)
2104  slot->dirty = false;
2106  SpinLockRelease(&slot->mutex);
2107 
2109 }
2110 
2111 /*
2112  * Load a single slot from disk into memory.
2113  */
2114 static void
2116 {
2118  int i;
2119  char slotdir[MAXPGPATH + 12];
2120  char path[MAXPGPATH + 22];
2121  int fd;
2122  bool restored = false;
2123  int readBytes;
2124  pg_crc32c checksum;
2125 
2126  /* no need to lock here, no concurrent access allowed yet */
2127 
2128  /* delete temp file if it exists */
2129  sprintf(slotdir, "pg_replslot/%s", name);
2130  sprintf(path, "%s/state.tmp", slotdir);
2131  if (unlink(path) < 0 && errno != ENOENT)
2132  ereport(PANIC,
2134  errmsg("could not remove file \"%s\": %m", path)));
2135 
2136  sprintf(path, "%s/state", slotdir);
2137 
2138  elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2139 
2140  /* on some operating systems fsyncing a file requires O_RDWR */
2141  fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2142 
2143  /*
2144  * We do not need to handle this as we are rename()ing the directory into
2145  * place only after we fsync()ed the state file.
2146  */
2147  if (fd < 0)
2148  ereport(PANIC,
2150  errmsg("could not open file \"%s\": %m", path)));
2151 
2152  /*
2153  * Sync state file before we're reading from it. We might have crashed
2154  * while it wasn't synced yet and we shouldn't continue on that basis.
2155  */
2156  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2157  if (pg_fsync(fd) != 0)
2158  ereport(PANIC,
2160  errmsg("could not fsync file \"%s\": %m",
2161  path)));
2163 
2164  /* Also sync the parent directory */
2166  fsync_fname(slotdir, true);
2167  END_CRIT_SECTION();
2168 
2169  /* read part of statefile that's guaranteed to be version independent */
2170  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2171  readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2173  if (readBytes != ReplicationSlotOnDiskConstantSize)
2174  {
2175  if (readBytes < 0)
2176  ereport(PANIC,
2178  errmsg("could not read file \"%s\": %m", path)));
2179  else
2180  ereport(PANIC,
2182  errmsg("could not read file \"%s\": read %d of %zu",
2183  path, readBytes,
2185  }
2186 
2187  /* verify magic */
2188  if (cp.magic != SLOT_MAGIC)
2189  ereport(PANIC,
2191  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2192  path, cp.magic, SLOT_MAGIC)));
2193 
2194  /* verify version */
2195  if (cp.version != SLOT_VERSION)
2196  ereport(PANIC,
2198  errmsg("replication slot file \"%s\" has unsupported version %u",
2199  path, cp.version)));
2200 
2201  /* boundary check on length */
2203  ereport(PANIC,
2205  errmsg("replication slot file \"%s\" has corrupted length %u",
2206  path, cp.length)));
2207 
2208  /* Now that we know the size, read the entire file */
2209  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2210  readBytes = read(fd,
2211  (char *) &cp + ReplicationSlotOnDiskConstantSize,
2212  cp.length);
2214  if (readBytes != cp.length)
2215  {
2216  if (readBytes < 0)
2217  ereport(PANIC,
2219  errmsg("could not read file \"%s\": %m", path)));
2220  else
2221  ereport(PANIC,
2223  errmsg("could not read file \"%s\": read %d of %zu",
2224  path, readBytes, (Size) cp.length)));
2225  }
2226 
2227  if (CloseTransientFile(fd) != 0)
2228  ereport(PANIC,
2230  errmsg("could not close file \"%s\": %m", path)));
2231 
2232  /* now verify the CRC */
2233  INIT_CRC32C(checksum);
2234  COMP_CRC32C(checksum,
2237  FIN_CRC32C(checksum);
2238 
2239  if (!EQ_CRC32C(checksum, cp.checksum))
2240  ereport(PANIC,
2241  (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2242  path, checksum, cp.checksum)));
2243 
2244  /*
2245  * If we crashed with an ephemeral slot active, don't restore but delete
2246  * it.
2247  */
2249  {
2250  if (!rmtree(slotdir, true))
2251  {
2252  ereport(WARNING,
2253  (errmsg("could not remove directory \"%s\"",
2254  slotdir)));
2255  }
2256  fsync_fname("pg_replslot", true);
2257  return;
2258  }
2259 
2260  /*
2261  * Verify that requirements for the specific slot type are met. That's
2262  * important because if these aren't met we're not guaranteed to retain
2263  * all the necessary resources for the slot.
2264  *
2265  * NB: We have to do so *after* the above checks for ephemeral slots,
2266  * because otherwise a slot that shouldn't exist anymore could prevent
2267  * restarts.
2268  *
2269  * NB: Changing the requirements here also requires adapting
2270  * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2271  */
2273  ereport(FATAL,
2274  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2275  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2276  NameStr(cp.slotdata.name)),
2277  errhint("Change wal_level to be logical or higher.")));
2278  else if (wal_level < WAL_LEVEL_REPLICA)
2279  ereport(FATAL,
2280  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2281  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2282  NameStr(cp.slotdata.name)),
2283  errhint("Change wal_level to be replica or higher.")));
2284 
2285  /* nothing can be active yet, don't lock anything */
2286  for (i = 0; i < max_replication_slots; i++)
2287  {
2288  ReplicationSlot *slot;
2289 
2291 
2292  if (slot->in_use)
2293  continue;
2294 
2295  /* restore the entire set of persistent data */
2296  memcpy(&slot->data, &cp.slotdata,
2298 
2299  /* initialize in memory state */
2300  slot->effective_xmin = cp.slotdata.xmin;
2303 
2308 
2309  slot->in_use = true;
2310  slot->active_pid = 0;
2311 
2312  restored = true;
2313  break;
2314  }
2315 
2316  if (!restored)
2317  ereport(FATAL,
2318  (errmsg("too many replication slots active before shutdown"),
2319  errhint("Increase max_replication_slots and try again.")));
2320 }
2321 
2322 /*
2323  * Maps a conflict reason for a replication slot to
2324  * ReplicationSlotInvalidationCause.
2325  */
2327 GetSlotInvalidationCause(const char *conflict_reason)
2328 {
2331  bool found PG_USED_FOR_ASSERTS_ONLY = false;
2332 
2333  Assert(conflict_reason);
2334 
2335  for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2336  {
2337  if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
2338  {
2339  found = true;
2340  result = cause;
2341  break;
2342  }
2343  }
2344 
2345  Assert(found);
2346  return result;
2347 }
#define InvalidBackendId
Definition: backendid.h:23
#define NameStr(name)
Definition: c.h:735
unsigned int uint32
Definition: c.h:495
#define ngettext(s, p, n)
Definition: c.h:1170
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:171
#define PG_BINARY
Definition: c.h:1262
#define pg_unreachable()
Definition: c.h:285
#define lengthof(array)
Definition: c.h:777
#define MemSet(start, val, len)
Definition: c.h:1009
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1235
int errcode_for_file_access(void)
Definition: elog.c:883
int errdetail(const char *fmt,...)
Definition: elog.c:1208
int errhint(const char *fmt,...)
Definition: elog.c:1322
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3913
int FreeDir(DIR *dir)
Definition: fd.c:2961
int CloseTransientFile(int fd)
Definition: fd.c:2809
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:525
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool IsBinaryUpgrade
Definition: globals.c:117
int MyProcPid
Definition: globals.c:45
bool IsUnderPostmaster
Definition: globals.c:116
Oid MyDatabaseId
Definition: globals.c:90
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1893
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1937
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:703
@ LWTRANCHE_REPLICATION_SLOT_IO
Definition: lwlock.h:191
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
char * pstrdup(const char *in)
Definition: mcxt.c:1619
void pfree(void *pointer)
Definition: mcxt.c:1431
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
@ B_STARTUP
Definition: miscadmin.h:338
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:515
BackendType MyBackendType
Definition: miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:712
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
static bool two_phase
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_IN_LOGICAL_DECODING
Definition: proc.h:60
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3854
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:426
void ReplicationSlotAlter(const char *name, bool failover)
Definition: slot.c:743
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:459
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:103
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:1779
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:272
void ReplicationSlotCleanup(void)
Definition: slot.c:682
void ReplicationSlotDropAcquired(void)
Definition: slot.c:804
void ReplicationSlotMarkDirty(void)
Definition: slot.c:946
void ReplicationSlotReserveWal(void)
Definition: slot.c:1337
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1148
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *conflict_reason)
Definition: slot.c:2327
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:502
bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:1723
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1206
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:100
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1090
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:985
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2115
#define RS_INVAL_MAX_CAUSES
Definition: slot.c:91
void ReplicationSlotPersist(void)
Definition: slot.c:963
ReplicationSlot * MyReplicationSlot
Definition: slot.c:116
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:1960
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:720
void ReplicationSlotSave(void)
Definition: slot.c:928
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:1899
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:106
void CheckSlotPermissions(void)
Definition: slot.c:1320
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:475
void ReplicationSlotsShmemInit(void)
Definition: slot.c:152
const char *const SlotInvalidationCauses[]
Definition: slot.c:83
void ReplicationSlotRelease(void)
Definition: slot.c:606
int max_replication_slots
Definition: slot.c:119
StaticAssertDecl(lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:113
#define SLOT_VERSION
Definition: slot.c:110
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1041
void ReplicationSlotInitialize(void)
Definition: slot.c:187
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:821
static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated)
Definition: slot.c:1479
void StartupReplicationSlots(void)
Definition: slot.c:1838
void CheckSlotRequirements(void)
Definition: slot.c:1298
#define SLOT_MAGIC
Definition: slot.c:109
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon)
Definition: slot.c:1413
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:97
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:134
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:215
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:196
ReplicationSlotPersistency
Definition: slot.h:34
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsPhysical(slot)
Definition: slot.h:206
ReplicationSlotInvalidationCause
Definition: slot.h:48
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:51
@ RS_INVAL_HORIZON
Definition: slot.h:53
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:55
@ RS_INVAL_NONE
Definition: slot.h:49
#define SlotIsLogical(slot)
Definition: slot.h:207
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1429
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:68
PROC_HDR * ProcGlobal
Definition: proc.c:81
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1286
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:32
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: dirent.c:26
uint8 statusFlags
Definition: proc.h:228
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:373
ReplicationSlot replication_slots[1]
Definition: slot.h:218
uint32 version
Definition: slot.c:69
ReplicationSlotPersistentData slotdata
Definition: slot.c:77
pg_crc32c checksum
Definition: slot.c:66
TransactionId xmin
Definition: slot.h:82
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistency persistency
Definition: slot.h:74
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:194
TransactionId effective_catalog_xmin
Definition: slot.h:175
slock_t mutex
Definition: slot.h:151
XLogRecPtr candidate_restart_valid
Definition: slot.h:195
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:203
pid_t active_pid
Definition: slot.h:157
bool in_use
Definition: slot.h:154
TransactionId effective_xmin
Definition: slot.h:174
bool just_dirtied
Definition: slot.h:160
XLogRecPtr candidate_restart_lsn
Definition: slot.h:196
LWLock io_in_progress_lock
Definition: slot.h:181
ConditionVariable active_cv
Definition: slot.h:184
TransactionId candidate_catalog_xmin
Definition: slot.h:193
bool dirty
Definition: slot.h:161
ReplicationSlotPersistentData data
Definition: slot.h:178
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:95
Definition: c.h:730
unsigned short st_mode
Definition: win32_port.h:268
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
const char * name
bool am_walsender
Definition: walsender.c:118
bool log_replication_commands
Definition: walsender.c:128
#define stat
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:325
#define kill(pid, sig)
Definition: win32_port.h:485
bool RecoveryInProgress(void)
Definition: xlog.c:6211
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3693
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6314
int wal_level
Definition: xlog.c:135
int wal_segment_size
Definition: xlog.c:147
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2619
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9272
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2733
@ WAL_LEVEL_REPLICA
Definition: xlog.h:73
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)