PostgreSQL Source Code  git master
slotsync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * slotsync.c
3  * Functionality for synchronizing slots to a standby server from the
4  * primary server.
5  *
6  * Copyright (c) 2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/slotsync.c
10  *
11  * This file contains the code for slot synchronization on a physical standby
12  * to fetch logical failover slots information from the primary server, create
13  * the slots on the standby and synchronize them periodically.
14  *
15  * Slot synchronization can be performed either automatically by enabling slot
16  * sync worker or manually by calling SQL function pg_sync_replication_slots().
17  *
18  * If the WAL corresponding to the remote's restart_lsn is not available on the
19  * physical standby or the remote's catalog_xmin precedes the oldest xid for
20  * which it is guaranteed that rows wouldn't have been removed then we cannot
21  * create the local standby slot because that would mean moving the local slot
22  * backward and decoding won't be possible via such a slot. In this case, the
23  * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
24  * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
25  * which slot sync worker can perform the sync periodically or user can call
26  * pg_sync_replication_slots() periodically to perform the syncs.
27  *
28  * If synchronized slots fail to build a consistent snapshot from the
29  * restart_lsn before reaching confirmed_flush_lsn, they would become
30  * unreliable after promotion due to potential data loss from changes
31  * before reaching a consistent point. This can happen because the slots can
32  * be synced at some random time and we may not reach the consistent point
33  * at the same WAL location as the primary. So, we mark such slots as
34  * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35  * consistent point, they will be marked as RS_PERSISTENT.
36  *
37  * The slot sync worker waits for some time before the next synchronization,
38  * with the duration varying based on whether any slots were updated during
39  * the last cycle. Refer to the comments above wait_for_slot_activity() for
40  * more details.
41  *
42  * Any standby synchronized slots will be dropped if they no longer need
43  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
44  * details.
45  *---------------------------------------------------------------------------
46  */
47 
48 #include "postgres.h"
49 
50 #include <time.h>
51 
52 #include "access/xlog_internal.h"
53 #include "access/xlogrecovery.h"
54 #include "catalog/pg_database.h"
55 #include "commands/dbcommands.h"
56 #include "libpq/pqsignal.h"
57 #include "pgstat.h"
59 #include "postmaster/interrupt.h"
60 #include "postmaster/postmaster.h"
61 #include "replication/logical.h"
62 #include "replication/slotsync.h"
63 #include "replication/snapbuild.h"
64 #include "storage/ipc.h"
65 #include "storage/lmgr.h"
66 #include "storage/proc.h"
67 #include "storage/procarray.h"
68 #include "tcop/tcopprot.h"
69 #include "utils/builtins.h"
70 #include "utils/pg_lsn.h"
71 #include "utils/ps_status.h"
72 #include "utils/timeout.h"
73 
74 /*
75  * Struct for sharing information to control slot synchronization.
76  *
77  * The slot sync worker's pid is needed by the startup process to shut it
78  * down during promotion. The startup process shuts down the slot sync worker
79  * and also sets stopSignaled=true to handle the race condition when the
80  * postmaster has not noticed the promotion yet and thus may end up restarting
81  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
82  * case. Note that we don't need to reset this variable as after promotion the
83  * slot sync worker won't be restarted because the pmState changes to PM_RUN from
84  * PM_HOT_STANDBY and we don't support demoting primary without restarting the
85  * server. See MaybeStartSlotSyncWorker.
86  *
87  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
88  * overwrites.
89  *
90  * The 'last_start_time' is needed by postmaster to start the slot sync worker
91  * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart
92  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
93  * last_start_time before exiting, so that postmaster can start the worker
94  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
95  *
96  * All the fields except 'syncing' are used only by slotsync worker.
97  * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
98  */
99 typedef struct SlotSyncCtxStruct
100 {
101  pid_t pid;
103  bool syncing;
107 
109 
110 /* GUC variable */
112 
113 /*
114  * The sleep time (ms) between slot-sync cycles varies dynamically
115  * (within a MIN/MAX range) according to slot activity. See
116  * wait_for_slot_activity() for details.
117  */
118 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
119 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */
120 
122 
123 /* The restart interval for slot sync work used by postmaster */
124 #define SLOTSYNC_RESTART_INTERVAL_SEC 10
125 
126 /*
127  * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
128  * in SlotSyncCtxStruct, this flag is true only if the current process is
129  * performing slot synchronization.
130  */
131 static bool syncing_slots = false;
132 
133 /*
134  * Structure to hold information fetched from the primary server about a logical
135  * replication slot.
136  */
137 typedef struct RemoteSlot
138 {
139  char *name;
140  char *plugin;
141  char *database;
142  bool two_phase;
143  bool failover;
147 
148  /* RS_INVAL_NONE if valid, or the reason of invalidation */
151 
152 static void slotsync_failure_callback(int code, Datum arg);
153 static void update_synced_slots_inactive_since(void);
154 
155 /*
156  * If necessary, update the local synced slot's metadata based on the data
157  * from the remote slot.
158  *
159  * If no update was needed (the data of the remote slot is the same as the
160  * local slot) return false, otherwise true.
161  *
162  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
163  * modified, and decoding from the corresponding LSN's can reach a
164  * consistent snapshot.
165  *
166  * *remote_slot_precedes will be true if the remote slot's LSN or xmin
167  * precedes locally reserved position.
168  */
169 static bool
170 update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
171  bool *found_consistent_snapshot,
172  bool *remote_slot_precedes)
173 {
175  bool updated_xmin_or_lsn = false;
176  bool updated_config = false;
177 
179 
180  if (found_consistent_snapshot)
181  *found_consistent_snapshot = false;
182 
183  if (remote_slot_precedes)
184  *remote_slot_precedes = false;
185 
186  /*
187  * Don't overwrite if we already have a newer catalog_xmin and
188  * restart_lsn.
189  */
190  if (remote_slot->restart_lsn < slot->data.restart_lsn ||
191  TransactionIdPrecedes(remote_slot->catalog_xmin,
192  slot->data.catalog_xmin))
193  {
194  /*
195  * This can happen in following situations:
196  *
197  * If the slot is temporary, it means either the initial WAL location
198  * reserved for the local slot is ahead of the remote slot's
199  * restart_lsn or the initial xmin_horizon computed for the local slot
200  * is ahead of the remote slot.
201  *
202  * If the slot is persistent, restart_lsn of the synced slot could
203  * still be ahead of the remote slot. Since we use slot advance
204  * functionality to keep snapbuild/slot updated, it is possible that
205  * the restart_lsn is advanced to a later position than it has on the
206  * primary. This can happen when slot advancing machinery finds
207  * running xacts record after reaching the consistent state at a later
208  * point than the primary where it serializes the snapshot and updates
209  * the restart_lsn.
210  *
211  * We LOG the message if the slot is temporary as it can help the user
212  * to understand why the slot is not sync-ready. In the case of a
213  * persistent slot, it would be a more common case and won't directly
214  * impact the users, so we used DEBUG1 level to log the message.
215  */
217  errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
218  remote_slot->name),
219  errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
220  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
221  remote_slot->catalog_xmin,
223  slot->data.catalog_xmin));
224 
225  if (remote_slot_precedes)
226  *remote_slot_precedes = true;
227  }
228 
229  /*
230  * Attempt to sync LSNs and xmins only if remote slot is ahead of local
231  * slot.
232  */
233  else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
234  remote_slot->restart_lsn > slot->data.restart_lsn ||
235  TransactionIdFollows(remote_slot->catalog_xmin,
236  slot->data.catalog_xmin))
237  {
238  /*
239  * We can't directly copy the remote slot's LSN or xmin unless there
240  * exists a consistent snapshot at that point. Otherwise, after
241  * promotion, the slots may not reach a consistent point before the
242  * confirmed_flush_lsn which can lead to a data loss. To avoid data
243  * loss, we let slot machinery advance the slot which ensures that
244  * snapbuilder/slot statuses are updated properly.
245  */
246  if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
247  {
248  /*
249  * Update the slot info directly if there is a serialized snapshot
250  * at the restart_lsn, as the slot can quickly reach consistency
251  * at restart_lsn by restoring the snapshot.
252  */
253  SpinLockAcquire(&slot->mutex);
254  slot->data.restart_lsn = remote_slot->restart_lsn;
255  slot->data.confirmed_flush = remote_slot->confirmed_lsn;
256  slot->data.catalog_xmin = remote_slot->catalog_xmin;
257  SpinLockRelease(&slot->mutex);
258 
259  if (found_consistent_snapshot)
260  *found_consistent_snapshot = true;
261  }
262  else
263  {
265  found_consistent_snapshot);
266 
267  /* Sanity check */
268  if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
269  ereport(ERROR,
270  errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
271  remote_slot->name),
272  errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
273  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
275  }
276 
277  updated_xmin_or_lsn = true;
278  }
279 
280  if (remote_dbid != slot->data.database ||
281  remote_slot->two_phase != slot->data.two_phase ||
282  remote_slot->failover != slot->data.failover ||
283  strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
284  {
285  NameData plugin_name;
286 
287  /* Avoid expensive operations while holding a spinlock. */
288  namestrcpy(&plugin_name, remote_slot->plugin);
289 
290  SpinLockAcquire(&slot->mutex);
291  slot->data.plugin = plugin_name;
292  slot->data.database = remote_dbid;
293  slot->data.two_phase = remote_slot->two_phase;
294  slot->data.failover = remote_slot->failover;
295  SpinLockRelease(&slot->mutex);
296 
297  updated_config = true;
298  }
299 
300  /*
301  * We have to write the changed xmin to disk *before* we change the
302  * in-memory value, otherwise after a crash we wouldn't know that some
303  * catalog tuples might have been removed already.
304  */
305  if (updated_config || updated_xmin_or_lsn)
306  {
309  }
310 
311  /*
312  * Now the new xmin is safely on disk, we can let the global value
313  * advance. We do not take ProcArrayLock or similar since we only advance
314  * xmin here and there's not much harm done by a concurrent computation
315  * missing that.
316  */
317  if (updated_xmin_or_lsn)
318  {
319  SpinLockAcquire(&slot->mutex);
320  slot->effective_catalog_xmin = remote_slot->catalog_xmin;
321  SpinLockRelease(&slot->mutex);
322 
325  }
326 
327  return updated_config || updated_xmin_or_lsn;
328 }
329 
330 /*
331  * Get the list of local logical slots that are synchronized from the
332  * primary server.
333  */
334 static List *
336 {
337  List *local_slots = NIL;
338 
339  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
340 
341  for (int i = 0; i < max_replication_slots; i++)
342  {
344 
345  /* Check if it is a synchronized slot */
346  if (s->in_use && s->data.synced)
347  {
348  Assert(SlotIsLogical(s));
349  local_slots = lappend(local_slots, s);
350  }
351  }
352 
353  LWLockRelease(ReplicationSlotControlLock);
354 
355  return local_slots;
356 }
357 
358 /*
359  * Helper function to check if local_slot is required to be retained.
360  *
361  * Return false either if local_slot does not exist in the remote_slots list
362  * or is invalidated while the corresponding remote slot is still valid,
363  * otherwise true.
364  */
365 static bool
366 local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
367 {
368  bool remote_exists = false;
369  bool locally_invalidated = false;
370 
371  foreach_ptr(RemoteSlot, remote_slot, remote_slots)
372  {
373  if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
374  {
375  remote_exists = true;
376 
377  /*
378  * If remote slot is not invalidated but local slot is marked as
379  * invalidated, then set locally_invalidated flag.
380  */
381  SpinLockAcquire(&local_slot->mutex);
382  locally_invalidated =
383  (remote_slot->invalidated == RS_INVAL_NONE) &&
384  (local_slot->data.invalidated != RS_INVAL_NONE);
385  SpinLockRelease(&local_slot->mutex);
386 
387  break;
388  }
389  }
390 
391  return (remote_exists && !locally_invalidated);
392 }
393 
394 /*
395  * Drop local obsolete slots.
396  *
397  * Drop the local slots that no longer need to be synced i.e. these either do
398  * not exist on the primary or are no longer enabled for failover.
399  *
400  * Additionally, drop any slots that are valid on the primary but got
401  * invalidated on the standby. This situation may occur due to the following
402  * reasons:
403  * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
404  * records from the restart_lsn of the slot.
405  * - 'primary_slot_name' is temporarily reset to null and the physical slot is
406  * removed.
407  * These dropped slots will get recreated in next sync-cycle and it is okay to
408  * drop and recreate such slots as long as these are not consumable on the
409  * standby (which is the case currently).
410  *
411  * Note: Change of 'wal_level' on the primary server to a level lower than
412  * logical may also result in slot invalidation and removal on the standby.
413  * This is because such 'wal_level' change is only possible if the logical
414  * slots are removed on the primary server, so it's expected to see the
415  * slots being invalidated and removed on the standby too (and re-created
416  * if they are re-created on the primary server).
417  */
418 static void
419 drop_local_obsolete_slots(List *remote_slot_list)
420 {
421  List *local_slots = get_local_synced_slots();
422 
423  foreach_ptr(ReplicationSlot, local_slot, local_slots)
424  {
425  /* Drop the local slot if it is not required to be retained. */
426  if (!local_sync_slot_required(local_slot, remote_slot_list))
427  {
428  bool synced_slot;
429 
430  /*
431  * Use shared lock to prevent a conflict with
432  * ReplicationSlotsDropDBSlots(), trying to drop the same slot
433  * during a drop-database operation.
434  */
435  LockSharedObject(DatabaseRelationId, local_slot->data.database,
436  0, AccessShareLock);
437 
438  /*
439  * In the small window between getting the slot to drop and
440  * locking the database, there is a possibility of a parallel
441  * database drop by the startup process and the creation of a new
442  * slot by the user. This new user-created slot may end up using
443  * the same shared memory as that of 'local_slot'. Thus check if
444  * local_slot is still the synced one before performing actual
445  * drop.
446  */
447  SpinLockAcquire(&local_slot->mutex);
448  synced_slot = local_slot->in_use && local_slot->data.synced;
449  SpinLockRelease(&local_slot->mutex);
450 
451  if (synced_slot)
452  {
453  ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
455  }
456 
457  UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
458  0, AccessShareLock);
459 
460  ereport(LOG,
461  errmsg("dropped replication slot \"%s\" of dbid %d",
462  NameStr(local_slot->data.name),
463  local_slot->data.database));
464  }
465  }
466 }
467 
468 /*
469  * Reserve WAL for the currently active local slot using the specified WAL
470  * location (restart_lsn).
471  *
472  * If the given WAL location has been removed, reserve WAL using the oldest
473  * existing WAL segment.
474  */
475 static void
477 {
478  XLogSegNo oldest_segno;
479  XLogSegNo segno;
481 
482  Assert(slot != NULL);
484 
485  while (true)
486  {
487  SpinLockAcquire(&slot->mutex);
488  slot->data.restart_lsn = restart_lsn;
489  SpinLockRelease(&slot->mutex);
490 
491  /* Prevent WAL removal as fast as possible */
493 
495 
496  /*
497  * Find the oldest existing WAL segment file.
498  *
499  * Normally, we can determine it by using the last removed segment
500  * number. However, if no WAL segment files have been removed by a
501  * checkpoint since startup, we need to search for the oldest segment
502  * file from the current timeline existing in XLOGDIR.
503  *
504  * XXX: Currently, we are searching for the oldest segment in the
505  * current timeline as there is less chance of the slot's restart_lsn
506  * from being some prior timeline, and even if it happens, in the
507  * worst case, we will wait to sync till the slot's restart_lsn moved
508  * to the current timeline.
509  */
510  oldest_segno = XLogGetLastRemovedSegno() + 1;
511 
512  if (oldest_segno == 1)
513  {
514  TimeLineID cur_timeline;
515 
516  GetWalRcvFlushRecPtr(NULL, &cur_timeline);
517  oldest_segno = XLogGetOldestSegno(cur_timeline);
518  }
519 
520  elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
521  segno, oldest_segno);
522 
523  /*
524  * If all required WAL is still there, great, otherwise retry. The
525  * slot should prevent further removal of WAL, unless there's a
526  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
527  * the new restart_lsn above, so normally we should never need to loop
528  * more than twice.
529  */
530  if (segno >= oldest_segno)
531  break;
532 
533  /* Retry using the location of the oldest wal segment */
534  XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
535  }
536 }
537 
538 /*
539  * If the remote restart_lsn and catalog_xmin have caught up with the
540  * local ones, then update the LSNs and persist the local synced slot for
541  * future synchronization; otherwise, do nothing.
542  *
543  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
544  * false.
545  */
546 static bool
548 {
550  bool found_consistent_snapshot = false;
551  bool remote_slot_precedes = false;
552 
553  (void) update_local_synced_slot(remote_slot, remote_dbid,
554  &found_consistent_snapshot,
555  &remote_slot_precedes);
556 
557  /*
558  * Check if the primary server has caught up. Refer to the comment atop
559  * the file for details on this check.
560  */
561  if (remote_slot_precedes)
562  {
563  /*
564  * The remote slot didn't catch up to locally reserved position.
565  *
566  * We do not drop the slot because the restart_lsn can be ahead of the
567  * current location when recreating the slot in the next cycle. It may
568  * take more time to create such a slot. Therefore, we keep this slot
569  * and attempt the synchronization in the next cycle.
570  */
571  return false;
572  }
573 
574  /*
575  * Don't persist the slot if it cannot reach the consistent point from the
576  * restart_lsn. See comments atop this file.
577  */
578  if (!found_consistent_snapshot)
579  {
580  ereport(LOG,
581  errmsg("could not sync slot \"%s\"", remote_slot->name),
582  errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
584 
585  return false;
586  }
587 
589 
590  ereport(LOG,
591  errmsg("newly created slot \"%s\" is sync-ready now",
592  remote_slot->name));
593 
594  return true;
595 }
596 
597 /*
598  * Synchronize a single slot to the given position.
599  *
600  * This creates a new slot if there is no existing one and updates the
601  * metadata of the slot as per the data received from the primary server.
602  *
603  * The slot is created as a temporary slot and stays in the same state until the
604  * the remote_slot catches up with locally reserved position and local slot is
605  * updated. The slot is then persisted and is considered as sync-ready for
606  * periodic syncs.
607  *
608  * Returns TRUE if the local slot is updated.
609  */
610 static bool
611 synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
612 {
613  ReplicationSlot *slot;
614  XLogRecPtr latestFlushPtr;
615  bool slot_updated = false;
616 
617  /*
618  * Make sure that concerned WAL is received and flushed before syncing
619  * slot to target lsn received from the primary server.
620  */
621  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
622  if (remote_slot->confirmed_lsn > latestFlushPtr)
623  {
624  /*
625  * Can get here only if GUC 'standby_slot_names' on the primary server
626  * was not configured correctly.
627  */
629  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
630  errmsg("skipping slot synchronization as the received slot sync"
631  " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
632  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
633  remote_slot->name,
634  LSN_FORMAT_ARGS(latestFlushPtr)));
635 
636  return false;
637  }
638 
639  /* Search for the named slot */
640  if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
641  {
642  bool synced;
643 
644  SpinLockAcquire(&slot->mutex);
645  synced = slot->data.synced;
646  SpinLockRelease(&slot->mutex);
647 
648  /* User-created slot with the same name exists, raise ERROR. */
649  if (!synced)
650  ereport(ERROR,
651  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("exiting from slot synchronization because same"
653  " name slot \"%s\" already exists on the standby",
654  remote_slot->name));
655 
656  /*
657  * The slot has been synchronized before.
658  *
659  * It is important to acquire the slot here before checking
660  * invalidation. If we don't acquire the slot first, there could be a
661  * race condition that the local slot could be invalidated just after
662  * checking the 'invalidated' flag here and we could end up
663  * overwriting 'invalidated' flag to remote_slot's value. See
664  * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
665  * if the slot is not acquired by other processes.
666  *
667  * XXX: If it ever turns out that slot acquire/release is costly for
668  * cases when none of the slot properties is changed then we can do a
669  * pre-check to ensure that at least one of the slot properties is
670  * changed before acquiring the slot.
671  */
672  ReplicationSlotAcquire(remote_slot->name, true);
673 
674  Assert(slot == MyReplicationSlot);
675 
676  /*
677  * Copy the invalidation cause from remote only if local slot is not
678  * invalidated locally, we don't want to overwrite existing one.
679  */
680  if (slot->data.invalidated == RS_INVAL_NONE &&
681  remote_slot->invalidated != RS_INVAL_NONE)
682  {
683  SpinLockAcquire(&slot->mutex);
684  slot->data.invalidated = remote_slot->invalidated;
685  SpinLockRelease(&slot->mutex);
686 
687  /* Make sure the invalidated state persists across server restart */
690 
691  slot_updated = true;
692  }
693 
694  /* Skip the sync of an invalidated slot */
695  if (slot->data.invalidated != RS_INVAL_NONE)
696  {
698  return slot_updated;
699  }
700 
701  /* Slot not ready yet, let's attempt to make it sync-ready now. */
702  if (slot->data.persistency == RS_TEMPORARY)
703  {
704  slot_updated = update_and_persist_local_synced_slot(remote_slot,
705  remote_dbid);
706  }
707 
708  /* Slot ready for sync, so sync it. */
709  else
710  {
711  /*
712  * Sanity check: As long as the invalidations are handled
713  * appropriately as above, this should never happen.
714  *
715  * We don't need to check restart_lsn here. See the comments in
716  * update_local_synced_slot() for details.
717  */
718  if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
719  ereport(ERROR,
720  errmsg_internal("cannot synchronize local slot \"%s\"",
721  remote_slot->name),
722  errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
724  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
725 
726  slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
727  NULL, NULL);
728  }
729  }
730  /* Otherwise create the slot first. */
731  else
732  {
733  NameData plugin_name;
734  TransactionId xmin_horizon = InvalidTransactionId;
735 
736  /* Skip creating the local slot if remote_slot is invalidated already */
737  if (remote_slot->invalidated != RS_INVAL_NONE)
738  return false;
739 
740  /*
741  * We create temporary slots instead of ephemeral slots here because
742  * we want the slots to survive after releasing them. This is done to
743  * avoid dropping and re-creating the slots in each synchronization
744  * cycle if the restart_lsn or catalog_xmin of the remote slot has not
745  * caught up.
746  */
747  ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
748  remote_slot->two_phase,
749  remote_slot->failover,
750  true);
751 
752  /* For shorter lines. */
753  slot = MyReplicationSlot;
754 
755  /* Avoid expensive operations while holding a spinlock. */
756  namestrcpy(&plugin_name, remote_slot->plugin);
757 
758  SpinLockAcquire(&slot->mutex);
759  slot->data.database = remote_dbid;
760  slot->data.plugin = plugin_name;
761  SpinLockRelease(&slot->mutex);
762 
764 
765  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
766  xmin_horizon = GetOldestSafeDecodingTransactionId(true);
767  SpinLockAcquire(&slot->mutex);
768  slot->effective_catalog_xmin = xmin_horizon;
769  slot->data.catalog_xmin = xmin_horizon;
770  SpinLockRelease(&slot->mutex);
772  LWLockRelease(ProcArrayLock);
773 
774  update_and_persist_local_synced_slot(remote_slot, remote_dbid);
775 
776  slot_updated = true;
777  }
778 
780 
781  return slot_updated;
782 }
783 
784 /*
785  * Synchronize slots.
786  *
787  * Gets the failover logical slots info from the primary server and updates
788  * the slots locally. Creates the slots if not present on the standby.
789  *
790  * Returns TRUE if any of the slots gets updated in this sync-cycle.
791  */
792 static bool
794 {
795 #define SLOTSYNC_COLUMN_COUNT 9
796  Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
797  LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
798 
800  TupleTableSlot *tupslot;
801  List *remote_slot_list = NIL;
802  bool some_slot_updated = false;
803  bool started_tx = false;
804  const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
805  " restart_lsn, catalog_xmin, two_phase, failover,"
806  " database, invalidation_reason"
807  " FROM pg_catalog.pg_replication_slots"
808  " WHERE failover and NOT temporary";
809 
811  if (SlotSyncCtx->syncing)
812  {
814  ereport(ERROR,
815  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
816  errmsg("cannot synchronize replication slots concurrently"));
817  }
818 
819  SlotSyncCtx->syncing = true;
821 
822  syncing_slots = true;
823 
824  /* The syscache access in walrcv_exec() needs a transaction env. */
825  if (!IsTransactionState())
826  {
828  started_tx = true;
829  }
830 
831  /* Execute the query */
832  res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
833  if (res->status != WALRCV_OK_TUPLES)
834  ereport(ERROR,
835  errmsg("could not fetch failover logical slots info from the primary server: %s",
836  res->err));
837 
838  /* Construct the remote_slot tuple and synchronize each slot locally */
839  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
840  while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
841  {
842  bool isnull;
843  RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
844  Datum d;
845  int col = 0;
846 
847  remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
848  &isnull));
849  Assert(!isnull);
850 
851  remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
852  &isnull));
853  Assert(!isnull);
854 
855  /*
856  * It is possible to get null values for LSN and Xmin if slot is
857  * invalidated on the primary server, so handle accordingly.
858  */
859  d = slot_getattr(tupslot, ++col, &isnull);
860  remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
861  DatumGetLSN(d);
862 
863  d = slot_getattr(tupslot, ++col, &isnull);
864  remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
865 
866  d = slot_getattr(tupslot, ++col, &isnull);
867  remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
869 
870  remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
871  &isnull));
872  Assert(!isnull);
873 
874  remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
875  &isnull));
876  Assert(!isnull);
877 
878  remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
879  ++col, &isnull));
880  Assert(!isnull);
881 
882  d = slot_getattr(tupslot, ++col, &isnull);
883  remote_slot->invalidated = isnull ? RS_INVAL_NONE :
885 
886  /* Sanity check */
888 
889  /*
890  * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
891  * slot is valid, that means we have fetched the remote_slot in its
892  * RS_EPHEMERAL state. In such a case, don't sync it; we can always
893  * sync it in the next sync cycle when the remote_slot is persisted
894  * and has valid lsn(s) and xmin values.
895  *
896  * XXX: In future, if we plan to expose 'slot->data.persistency' in
897  * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
898  * slots in the first place.
899  */
900  if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
901  XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
902  !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
903  remote_slot->invalidated == RS_INVAL_NONE)
904  pfree(remote_slot);
905  else
906  /* Create list of remote slots */
907  remote_slot_list = lappend(remote_slot_list, remote_slot);
908 
909  ExecClearTuple(tupslot);
910  }
911 
912  /* Drop local slots that no longer need to be synced. */
913  drop_local_obsolete_slots(remote_slot_list);
914 
915  /* Now sync the slots locally */
916  foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
917  {
918  Oid remote_dbid = get_database_oid(remote_slot->database, false);
919 
920  /*
921  * Use shared lock to prevent a conflict with
922  * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
923  * a drop-database operation.
924  */
925  LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
926 
927  some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
928 
929  UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
930  }
931 
932  /* We are done, free remote_slot_list elements */
933  list_free_deep(remote_slot_list);
934 
936 
937  if (started_tx)
939 
941  SlotSyncCtx->syncing = false;
943 
944  syncing_slots = false;
945 
946  return some_slot_updated;
947 }
948 
949 /*
950  * Checks the remote server info.
951  *
952  * We ensure that the 'primary_slot_name' exists on the remote server and the
953  * remote server is not a standby node.
954  */
955 static void
957 {
958 #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
960  Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
961  StringInfoData cmd;
962  bool isnull;
963  TupleTableSlot *tupslot;
964  bool remote_in_recovery;
965  bool primary_slot_valid;
966  bool started_tx = false;
967 
968  initStringInfo(&cmd);
969  appendStringInfo(&cmd,
970  "SELECT pg_is_in_recovery(), count(*) = 1"
971  " FROM pg_catalog.pg_replication_slots"
972  " WHERE slot_type='physical' AND slot_name=%s",
974 
975  /* The syscache access in walrcv_exec() needs a transaction env. */
976  if (!IsTransactionState())
977  {
979  started_tx = true;
980  }
981 
983  pfree(cmd.data);
984 
985  if (res->status != WALRCV_OK_TUPLES)
986  ereport(ERROR,
987  errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
988  PrimarySlotName, res->err),
989  errhint("Check if primary_slot_name is configured correctly."));
990 
991  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
992  if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
993  elog(ERROR,
994  "failed to fetch tuple for the primary server slot specified by primary_slot_name");
995 
996  remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
997  Assert(!isnull);
998 
999  /*
1000  * Slot sync is currently not supported on a cascading standby. This is
1001  * because if we allow it, the primary server needs to wait for all the
1002  * cascading standbys, otherwise, logical subscribers can still be ahead
1003  * of one of the cascading standbys which we plan to promote. Thus, to
1004  * avoid this additional complexity, we restrict it for the time being.
1005  */
1006  if (remote_in_recovery)
1007  ereport(ERROR,
1008  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1009  errmsg("cannot synchronize replication slots from a standby server"));
1010 
1011  primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
1012  Assert(!isnull);
1013 
1014  if (!primary_slot_valid)
1015  ereport(ERROR,
1016  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1017  errmsg("slot synchronization requires valid primary_slot_name"),
1018  /* translator: second %s is a GUC variable name */
1019  errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.",
1020  PrimarySlotName, "primary_slot_name"));
1021 
1022  ExecClearTuple(tupslot);
1024 
1025  if (started_tx)
1027 }
1028 
1029 /*
1030  * Checks if dbname is specified in 'primary_conninfo'.
1031  *
1032  * Error out if not specified otherwise return it.
1033  */
1034 char *
1036 {
1037  char *dbname;
1038 
1039  /*
1040  * The slot synchronization needs a database connection for walrcv_exec to
1041  * work.
1042  */
1044  if (dbname == NULL)
1045  ereport(ERROR,
1046 
1047  /*
1048  * translator: dbname is a specific option; %s is a GUC variable name
1049  */
1050  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1051  errmsg("slot synchronization requires dbname to be specified in %s",
1052  "primary_conninfo"));
1053  return dbname;
1054 }
1055 
1056 /*
1057  * Return true if all necessary GUCs for slot synchronization are set
1058  * appropriately, otherwise, return false.
1059  */
1060 bool
1062 {
1063  /*
1064  * Logical slot sync/creation requires wal_level >= logical.
1065  *
1066  * Sincle altering the wal_level requires a server restart, so error out
1067  * in this case regardless of elevel provided by caller.
1068  */
1070  ereport(ERROR,
1071  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1072  errmsg("slot synchronization requires wal_level >= \"logical\""));
1073 
1074  /*
1075  * A physical replication slot(primary_slot_name) is required on the
1076  * primary to ensure that the rows needed by the standby are not removed
1077  * after restarting, so that the synchronized slot on the standby will not
1078  * be invalidated.
1079  */
1080  if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
1081  {
1082  ereport(elevel,
1083  /* translator: %s is a GUC variable name */
1084  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1085  errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
1086  return false;
1087  }
1088 
1089  /*
1090  * hot_standby_feedback must be enabled to cooperate with the physical
1091  * replication slot, which allows informing the primary about the xmin and
1092  * catalog_xmin values on the standby.
1093  */
1094  if (!hot_standby_feedback)
1095  {
1096  ereport(elevel,
1097  /* translator: %s is a GUC variable name */
1098  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099  errmsg("slot synchronization requires %s to be enabled",
1100  "hot_standby_feedback"));
1101  return false;
1102  }
1103 
1104  /*
1105  * The primary_conninfo is required to make connection to primary for
1106  * getting slots information.
1107  */
1108  if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
1109  {
1110  ereport(elevel,
1111  /* translator: %s is a GUC variable name */
1112  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1113  errmsg("slot synchronization requires %s to be defined",
1114  "primary_conninfo"));
1115  return false;
1116  }
1117 
1118  return true;
1119 }
1120 
1121 /*
1122  * Re-read the config file.
1123  *
1124  * Exit if any of the slot sync GUCs have changed. The postmaster will
1125  * restart it.
1126  */
1127 static void
1129 {
1130  char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
1131  char *old_primary_slotname = pstrdup(PrimarySlotName);
1132  bool old_sync_replication_slots = sync_replication_slots;
1133  bool old_hot_standby_feedback = hot_standby_feedback;
1134  bool conninfo_changed;
1135  bool primary_slotname_changed;
1136 
1138 
1139  ConfigReloadPending = false;
1141 
1142  conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
1143  primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
1144  pfree(old_primary_conninfo);
1145  pfree(old_primary_slotname);
1146 
1147  if (old_sync_replication_slots != sync_replication_slots)
1148  {
1149  ereport(LOG,
1150  /* translator: %s is a GUC variable name */
1151  errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
1152  proc_exit(0);
1153  }
1154 
1155  if (conninfo_changed ||
1156  primary_slotname_changed ||
1157  (old_hot_standby_feedback != hot_standby_feedback))
1158  {
1159  ereport(LOG,
1160  errmsg("slot sync worker will restart because of a parameter change"));
1161 
1162  /*
1163  * Reset the last-start time for this worker so that the postmaster
1164  * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
1165  */
1167 
1168  proc_exit(0);
1169  }
1170 
1171 }
1172 
1173 /*
1174  * Interrupt handler for main loop of slot sync worker.
1175  */
1176 static void
1178 {
1180 
1182  {
1183  ereport(LOG,
1184  errmsg("slot sync worker is shutting down on receiving SIGINT"));
1185 
1186  proc_exit(0);
1187  }
1188 
1189  if (ConfigReloadPending)
1191 }
1192 
1193 /*
1194  * Cleanup function for slotsync worker.
1195  *
1196  * Called on slotsync worker exit.
1197  */
1198 static void
1200 {
1204 }
1205 
1206 /*
1207  * Sleep for long enough that we believe it's likely that the slots on primary
1208  * get updated.
1209  *
1210  * If there is no slot activity the wait time between sync-cycles will double
1211  * (to a maximum of 30s). If there is some slot activity the wait time between
1212  * sync-cycles is reset to the minimum (200ms).
1213  */
1214 static void
1215 wait_for_slot_activity(bool some_slot_updated)
1216 {
1217  int rc;
1218 
1219  if (!some_slot_updated)
1220  {
1221  /*
1222  * No slots were updated, so double the sleep time, but not beyond the
1223  * maximum allowable value.
1224  */
1226  }
1227  else
1228  {
1229  /*
1230  * Some slots were updated since the last sleep, so reset the sleep
1231  * time.
1232  */
1234  }
1235 
1236  rc = WaitLatch(MyLatch,
1238  sleep_ms,
1239  WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1240 
1241  if (rc & WL_LATCH_SET)
1243 }
1244 
1245 /*
1246  * The main loop of our worker process.
1247  *
1248  * It connects to the primary server, fetches logical failover slots
1249  * information periodically in order to create and sync the slots.
1250  */
1251 void
1252 ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
1253 {
1254  WalReceiverConn *wrconn = NULL;
1255  char *dbname;
1256  char *err;
1257  sigjmp_buf local_sigjmp_buf;
1258  StringInfoData app_name;
1259 
1260  Assert(startup_data_len == 0);
1261 
1263 
1264  init_ps_display(NULL);
1265 
1267 
1268  /*
1269  * Create a per-backend PGPROC struct in shared memory. We must do this
1270  * before we access any shared memory.
1271  */
1272  InitProcess();
1273 
1274  /*
1275  * Early initialization.
1276  */
1277  BaseInit();
1278 
1279  Assert(SlotSyncCtx != NULL);
1280 
1283 
1284  /*
1285  * Startup process signaled the slot sync worker to stop, so if meanwhile
1286  * postmaster ended up starting the worker again, exit.
1287  */
1289  {
1291  proc_exit(0);
1292  }
1293 
1294  /* Advertise our PID so that the startup process can kill us on promotion */
1297 
1298  ereport(LOG, errmsg("slot sync worker started"));
1299 
1300  /* Register it as soon as SlotSyncCtx->pid is initialized. */
1302 
1303  /* Setup signal handling */
1306  pqsignal(SIGTERM, die);
1312 
1313  /*
1314  * Establishes SIGALRM handler and initialize timeout module. It is needed
1315  * by InitPostgres to register different timeouts.
1316  */
1318 
1319  /* Load the libpq-specific functions */
1320  load_file("libpqwalreceiver", false);
1321 
1322  /*
1323  * If an exception is encountered, processing resumes here.
1324  *
1325  * We just need to clean up, report the error, and go away.
1326  *
1327  * If we do not have this handling here, then since this worker process
1328  * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1329  * Therefore, we create our own exception handler to catch ERRORs.
1330  */
1331  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1332  {
1333  /* since not using PG_TRY, must reset error stack by hand */
1334  error_context_stack = NULL;
1335 
1336  /* Prevents interrupts while cleaning up */
1337  HOLD_INTERRUPTS();
1338 
1339  /* Report the error to the server log */
1340  EmitErrorReport();
1341 
1342  /*
1343  * We can now go away. Note that because we called InitProcess, a
1344  * callback was registered to do ProcKill, which will clean up
1345  * necessary state.
1346  */
1347  proc_exit(0);
1348  }
1349 
1350  /* We can now handle ereport(ERROR) */
1351  PG_exception_stack = &local_sigjmp_buf;
1352 
1353  /*
1354  * Unblock signals (they were blocked when the postmaster forked us)
1355  */
1356  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
1357 
1358  /*
1359  * Set always-secure search path, so malicious users can't redirect user
1360  * code (e.g. operators).
1361  *
1362  * It's not strictly necessary since we won't be scanning or writing to
1363  * any user table locally, but it's good to retain it here for added
1364  * precaution.
1365  */
1366  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1367 
1369 
1370  /*
1371  * Connect to the database specified by the user in primary_conninfo. We
1372  * need a database connection for walrcv_exec to work which we use to
1373  * fetch slot information from the remote node. See comments atop
1374  * libpqrcv_exec.
1375  *
1376  * We do not specify a specific user here since the slot sync worker will
1377  * operate as a superuser. This is safe because the slot sync worker does
1378  * not interact with user tables, eliminating the risk of executing
1379  * arbitrary code within triggers.
1380  */
1381  InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
1382 
1384 
1385  initStringInfo(&app_name);
1386  if (cluster_name[0])
1387  appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
1388  else
1389  appendStringInfoString(&app_name, "slotsync worker");
1390 
1391  /*
1392  * Establish the connection to the primary server for slot
1393  * synchronization.
1394  */
1395  wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
1396  app_name.data, &err);
1397  pfree(app_name.data);
1398 
1399  if (!wrconn)
1400  ereport(ERROR,
1401  errcode(ERRCODE_CONNECTION_FAILURE),
1402  errmsg("could not connect to the primary server: %s", err));
1403 
1404  /*
1405  * Register the failure callback once we have the connection.
1406  *
1407  * XXX: This can be combined with previous such cleanup registration of
1408  * slotsync_worker_onexit() but that will need the connection to be made
1409  * global and we want to avoid introducing global for this purpose.
1410  */
1412 
1413  /*
1414  * Using the specified primary server connection, check that we are not a
1415  * cascading standby and slot configured in 'primary_slot_name' exists on
1416  * the primary server.
1417  */
1419 
1420  /* Main loop to synchronize slots */
1421  for (;;)
1422  {
1423  bool some_slot_updated = false;
1424 
1426 
1427  some_slot_updated = synchronize_slots(wrconn);
1428 
1429  wait_for_slot_activity(some_slot_updated);
1430  }
1431 
1432  /*
1433  * The slot sync worker can't get here because it will only stop when it
1434  * receives a SIGINT from the startup process, or when there is an error.
1435  */
1436  Assert(false);
1437 }
1438 
1439 /*
1440  * Update the inactive_since property for synced slots.
1441  *
1442  * Note that this function is currently called when we shutdown the slot
1443  * sync machinery.
1444  */
1445 static void
1447 {
1448  TimestampTz now = 0;
1449 
1450  /*
1451  * We need to update inactive_since only when we are promoting standby to
1452  * correctly interpret the inactive_since if the standby gets promoted
1453  * without a restart. We don't want the slots to appear inactive for a
1454  * long time after promotion if they haven't been synchronized recently.
1455  * Whoever acquires the slot i.e.makes the slot active will reset it.
1456  */
1457  if (!StandbyMode)
1458  return;
1459 
1460  /* The slot sync worker mustn't be running by now */
1462 
1463  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1464 
1465  for (int i = 0; i < max_replication_slots; i++)
1466  {
1468 
1469  /* Check if it is a synchronized slot */
1470  if (s->in_use && s->data.synced)
1471  {
1472  Assert(SlotIsLogical(s));
1473 
1474  /* Use the same inactive_since time for all the slots. */
1475  if (now == 0)
1477 
1478  SpinLockAcquire(&s->mutex);
1479  s->inactive_since = now;
1480  SpinLockRelease(&s->mutex);
1481  }
1482  }
1483 
1484  LWLockRelease(ReplicationSlotControlLock);
1485 }
1486 
1487 /*
1488  * Shut down the slot sync worker.
1489  */
1490 void
1492 {
1494 
1495  SlotSyncCtx->stopSignaled = true;
1496 
1497  if (SlotSyncCtx->pid == InvalidPid)
1498  {
1501  return;
1502  }
1504 
1505  kill(SlotSyncCtx->pid, SIGINT);
1506 
1507  /* Wait for it to die */
1508  for (;;)
1509  {
1510  int rc;
1511 
1512  /* Wait a bit, we don't expect to have to wait long */
1513  rc = WaitLatch(MyLatch,
1515  10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1516 
1517  if (rc & WL_LATCH_SET)
1518  {
1521  }
1522 
1524 
1525  /* Is it gone? */
1526  if (SlotSyncCtx->pid == InvalidPid)
1527  break;
1528 
1530  }
1531 
1533 
1535 }
1536 
1537 /*
1538  * SlotSyncWorkerCanRestart
1539  *
1540  * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
1541  * since it was launched last. Otherwise returns false.
1542  *
1543  * This is a safety valve to protect against continuous respawn attempts if the
1544  * worker is dying immediately at launch. Note that since we will retry to
1545  * launch the worker from the postmaster main loop, we will get another
1546  * chance later.
1547  */
1548 bool
1550 {
1551  time_t curtime = time(NULL);
1552 
1553  /* Return false if too soon since last start. */
1554  if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
1555  (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
1556  return false;
1557 
1558  SlotSyncCtx->last_start_time = curtime;
1559 
1560  return true;
1561 }
1562 
1563 /*
1564  * Is current process syncing replication slots?
1565  *
1566  * Could be either backend executing SQL function or slot sync worker.
1567  */
1568 bool
1570 {
1571  return syncing_slots;
1572 }
1573 
1574 /*
1575  * Amount of shared memory required for slot synchronization.
1576  */
1577 Size
1579 {
1580  return sizeof(SlotSyncCtxStruct);
1581 }
1582 
1583 /*
1584  * Allocate and initialize the shared memory of slot synchronization.
1585  */
1586 void
1588 {
1590  bool found;
1591 
1593  ShmemInitStruct("Slot Sync Data", size, &found);
1594 
1595  if (!found)
1596  {
1597  memset(SlotSyncCtx, 0, size);
1600  }
1601 }
1602 
1603 /*
1604  * Error cleanup callback for slot synchronization.
1605  */
1606 static void
1608 {
1610 
1611  if (syncing_slots)
1612  {
1613  /*
1614  * If syncing_slots is true, it indicates that the process errored out
1615  * without resetting the flag. So, we need to clean up shared memory
1616  * and reset the flag here.
1617  */
1619  SlotSyncCtx->syncing = false;
1621 
1622  syncing_slots = false;
1623  }
1624 
1626 }
1627 
1628 /*
1629  * Synchronize the failover enabled replication slots using the specified
1630  * primary server connection.
1631  */
1632 void
1634 {
1636  {
1638 
1640  }
1642 }
sigset_t UnBlockSig
Definition: pqsignal.c:22
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:746
#define Min(x, y)
Definition: c.h:1004
#define Assert(condition)
Definition: c.h:858
#define UINT64_FORMAT
Definition: c.h:549
uint32 TransactionId
Definition: c.h:652
size_t Size
Definition: c.h:605
int64 TimestampTz
Definition: timestamp.h:39
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
void EmitErrorReport(void)
Definition: elog.c:1672
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1232
int errdetail(const char *fmt,...)
Definition: elog.c:1205
ErrorContextCallback * error_context_stack
Definition: elog.c:94
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
int MyProcPid
Definition: globals.c:45
struct Latch * MyLatch
Definition: globals.c:60
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4275
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * cluster_name
Definition: guc_tables.c:540
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void proc_exit(int code)
Definition: ipc.c:104
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
int i
Definition: isn.c:73
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1142
#define AccessShareLock
Definition: lockdefs.h:36
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
Definition: logical.c:2060
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
char * pstrdup(const char *in)
Definition: mcxt.c:1695
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc0(Size size)
Definition: mcxt.c:1346
@ NormalProcessing
Definition: miscadmin.h:449
@ InitProcessing
Definition: miscadmin.h:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define AmLogicalSlotSyncWorkerProcess()
Definition: miscadmin.h:378
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
#define SetProcessingMode(mode)
Definition: miscadmin.h:460
@ B_SLOTSYNC_WORKER
Definition: miscadmin.h:343
#define InvalidPid
Definition: miscadmin.h:32
BackendType MyBackendType
Definition: miscinit.c:63
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void FloatExceptionHandler(SIGNAL_ARGS)
Definition: postgres.c:3019
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:262
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
void BaseInit(void)
Definition: postinit.c:645
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
Definition: postinit.c:736
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2932
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void init_ps_display(const char *fixed_part)
Definition: ps_status.c:267
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
int slock_t
Definition: s_lock.h:735
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static pg_noinline void Size size
Definition: slab.c:607
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotDropAcquired(void)
Definition: slot.c:864
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1006
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2390
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1045
void ReplicationSlotPersist(void)
Definition: slot.c:1023
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotSave(void)
Definition: slot.c:988
void ReplicationSlotRelease(void)
Definition: slot.c:652
int max_replication_slots
Definition: slot.c:141
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1101
@ RS_TEMPORARY
Definition: slot.h:37
ReplicationSlotInvalidationCause
Definition: slot.h:48
@ RS_INVAL_NONE
Definition: slot.h:49
#define SlotIsLogical(slot)
Definition: slot.h:210
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:118
#define PRIMARY_INFO_OUTPUT_COL_COUNT
void SyncReplicationSlots(WalReceiverConn *wrconn)
Definition: slotsync.c:1633
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
Definition: slotsync.c:366
static void drop_local_obsolete_slots(List *remote_slot_list)
Definition: slotsync.c:419
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
Definition: slotsync.c:476
void ShutDownSlotSync(void)
Definition: slotsync.c:1491
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:547
bool sync_replication_slots
Definition: slotsync.c:111
SlotSyncCtxStruct * SlotSyncCtx
Definition: slotsync.c:108
static void slotsync_failure_callback(int code, Datum arg)
Definition: slotsync.c:1607
#define SLOTSYNC_COLUMN_COUNT
static long sleep_ms
Definition: slotsync.c:121
#define SLOTSYNC_RESTART_INTERVAL_SEC
Definition: slotsync.c:124
static bool syncing_slots
Definition: slotsync.c:131
struct RemoteSlot RemoteSlot
struct SlotSyncCtxStruct SlotSyncCtxStruct
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:119
static bool synchronize_slots(WalReceiverConn *wrconn)
Definition: slotsync.c:793
void ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Definition: slotsync.c:1252
bool SlotSyncWorkerCanRestart(void)
Definition: slotsync.c:1549
static List * get_local_synced_slots(void)
Definition: slotsync.c:335
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:611
static void wait_for_slot_activity(bool some_slot_updated)
Definition: slotsync.c:1215
static void slotsync_reread_config(void)
Definition: slotsync.c:1128
char * CheckAndGetDbnameFromConninfo(void)
Definition: slotsync.c:1035
void SlotSyncShmemInit(void)
Definition: slotsync.c:1587
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
Definition: slotsync.c:170
static void slotsync_worker_onexit(int code, Datum arg)
Definition: slotsync.c:1199
static void update_synced_slots_inactive_since(void)
Definition: slotsync.c:1446
bool ValidateSlotSyncParams(int elevel)
Definition: slotsync.c:1061
static void validate_remote_info(WalReceiverConn *wrconn)
Definition: slotsync.c:956
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1569
Size SlotSyncShmemSize(void)
Definition: slotsync.c:1578
static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
Definition: slotsync.c:1177
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
Definition: snapbuild.c:2142
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
void InitProcess(void)
Definition: proc.c:296
char * dbname
Definition: streamutil.c:52
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: pg_list.h:54
bool two_phase
Definition: slotsync.c:142
char * plugin
Definition: slotsync.c:140
char * name
Definition: slotsync.c:139
char * database
Definition: slotsync.c:141
bool failover
Definition: slotsync.c:143
ReplicationSlotInvalidationCause invalidated
Definition: slotsync.c:149
XLogRecPtr confirmed_lsn
Definition: slotsync.c:145
XLogRecPtr restart_lsn
Definition: slotsync.c:144
TransactionId catalog_xmin
Definition: slotsync.c:146
ReplicationSlot replication_slots[1]
Definition: slot.h:221
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistency persistency
Definition: slot.h:74
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
TransactionId effective_catalog_xmin
Definition: slot.h:175
slock_t mutex
Definition: slot.h:151
bool in_use
Definition: slot.h:154
ReplicationSlotPersistentData data
Definition: slot.h:178
TimestampTz inactive_since
Definition: slot.h:206
time_t last_start_time
Definition: slotsync.c:104
Definition: c.h:741
void InitializeTimeouts(void)
Definition: timeout.c:470
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:314
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
bool hot_standby_feedback
Definition: walreceiver.c:89
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:468
#define walrcv_get_dbname_from_conninfo(conninfo)
Definition: walreceiver.h:442
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:462
#define walrcv_disconnect(conn)
Definition: walreceiver.h:464
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3504
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define kill(pid, sig)
Definition: win32_port.h:485
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
bool IsTransactionState(void)
Definition: xact.c:384
void StartTransactionCommand(void)
Definition: xact.c:2995
void CommitTransactionCommand(void)
Definition: xact.c:3093
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3747
int wal_level
Definition: xlog.c:131
int wal_segment_size
Definition: xlog.c:143
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3763
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
char * PrimarySlotName
Definition: xlogrecovery.c:97
bool StandbyMode
Definition: xlogrecovery.c:147
char * PrimaryConnInfo
Definition: xlogrecovery.c:96