PostgreSQL Source Code  git master
slotsync.c File Reference
#include "postgres.h"
#include <time.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "libpq/pqsignal.h"
#include "pgstat.h"
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
Include dependency graph for slotsync.c:

Go to the source code of this file.

Data Structures

struct  SlotSyncCtxStruct
 
struct  RemoteSlot
 

Macros

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200
 
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */
 
#define SLOTSYNC_RESTART_INTERVAL_SEC   10
 
#define SLOTSYNC_COLUMN_COUNT   9
 
#define PRIMARY_INFO_OUTPUT_COL_COUNT   2
 

Typedefs

typedef struct SlotSyncCtxStruct SlotSyncCtxStruct
 
typedef struct RemoteSlot RemoteSlot
 

Functions

static void slotsync_failure_callback (int code, Datum arg)
 
static void update_synced_slots_inactive_since (void)
 
static bool update_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
 
static Listget_local_synced_slots (void)
 
static bool local_sync_slot_required (ReplicationSlot *local_slot, List *remote_slots)
 
static void drop_local_obsolete_slots (List *remote_slot_list)
 
static void reserve_wal_for_local_slot (XLogRecPtr restart_lsn)
 
static bool update_and_persist_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_one_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_slots (WalReceiverConn *wrconn)
 
static void validate_remote_info (WalReceiverConn *wrconn)
 
char * CheckAndGetDbnameFromConninfo (void)
 
bool ValidateSlotSyncParams (int elevel)
 
static void slotsync_reread_config (void)
 
static void ProcessSlotSyncInterrupts (WalReceiverConn *wrconn)
 
static void slotsync_worker_onexit (int code, Datum arg)
 
static void wait_for_slot_activity (bool some_slot_updated)
 
void ReplSlotSyncWorkerMain (char *startup_data, size_t startup_data_len)
 
void ShutDownSlotSync (void)
 
bool SlotSyncWorkerCanRestart (void)
 
bool IsSyncingReplicationSlots (void)
 
Size SlotSyncShmemSize (void)
 
void SlotSyncShmemInit (void)
 
void SyncReplicationSlots (WalReceiverConn *wrconn)
 

Variables

SlotSyncCtxStructSlotSyncCtx = NULL
 
bool sync_replication_slots = false
 
static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
 
static bool syncing_slots = false
 

Macro Definition Documentation

◆ MAX_SLOTSYNC_WORKER_NAPTIME_MS

#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */

Definition at line 119 of file slotsync.c.

◆ MIN_SLOTSYNC_WORKER_NAPTIME_MS

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200

Definition at line 118 of file slotsync.c.

◆ PRIMARY_INFO_OUTPUT_COL_COUNT

#define PRIMARY_INFO_OUTPUT_COL_COUNT   2

◆ SLOTSYNC_COLUMN_COUNT

#define SLOTSYNC_COLUMN_COUNT   9

◆ SLOTSYNC_RESTART_INTERVAL_SEC

#define SLOTSYNC_RESTART_INTERVAL_SEC   10

Definition at line 124 of file slotsync.c.

Typedef Documentation

◆ RemoteSlot

typedef struct RemoteSlot RemoteSlot

◆ SlotSyncCtxStruct

Function Documentation

◆ CheckAndGetDbnameFromConninfo()

char* CheckAndGetDbnameFromConninfo ( void  )

Definition at line 1035 of file slotsync.c.

1036 {
1037  char *dbname;
1038 
1039  /*
1040  * The slot synchronization needs a database connection for walrcv_exec to
1041  * work.
1042  */
1044  if (dbname == NULL)
1045  ereport(ERROR,
1046 
1047  /*
1048  * translator: dbname is a specific option; %s is a GUC variable name
1049  */
1050  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1051  errmsg("slot synchronization requires dbname to be specified in %s",
1052  "primary_conninfo"));
1053  return dbname;
1054 }
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
char * dbname
Definition: streamutil.c:52
#define walrcv_get_dbname_from_conninfo(conninfo)
Definition: walreceiver.h:442
char * PrimaryConnInfo
Definition: xlogrecovery.c:96

References dbname, ereport, errcode(), errmsg(), ERROR, PrimaryConnInfo, and walrcv_get_dbname_from_conninfo.

Referenced by pg_sync_replication_slots(), and ReplSlotSyncWorkerMain().

◆ drop_local_obsolete_slots()

static void drop_local_obsolete_slots ( List remote_slot_list)
static

Definition at line 419 of file slotsync.c.

420 {
421  List *local_slots = get_local_synced_slots();
422 
423  foreach_ptr(ReplicationSlot, local_slot, local_slots)
424  {
425  /* Drop the local slot if it is not required to be retained. */
426  if (!local_sync_slot_required(local_slot, remote_slot_list))
427  {
428  bool synced_slot;
429 
430  /*
431  * Use shared lock to prevent a conflict with
432  * ReplicationSlotsDropDBSlots(), trying to drop the same slot
433  * during a drop-database operation.
434  */
435  LockSharedObject(DatabaseRelationId, local_slot->data.database,
436  0, AccessShareLock);
437 
438  /*
439  * In the small window between getting the slot to drop and
440  * locking the database, there is a possibility of a parallel
441  * database drop by the startup process and the creation of a new
442  * slot by the user. This new user-created slot may end up using
443  * the same shared memory as that of 'local_slot'. Thus check if
444  * local_slot is still the synced one before performing actual
445  * drop.
446  */
447  SpinLockAcquire(&local_slot->mutex);
448  synced_slot = local_slot->in_use && local_slot->data.synced;
449  SpinLockRelease(&local_slot->mutex);
450 
451  if (synced_slot)
452  {
453  ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
455  }
456 
457  UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
458  0, AccessShareLock);
459 
460  ereport(LOG,
461  errmsg("dropped replication slot \"%s\" of dbid %d",
462  NameStr(local_slot->data.name),
463  local_slot->data.database));
464  }
465  }
466 }
#define NameStr(name)
Definition: c.h:746
#define LOG
Definition: elog.h:31
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1142
#define AccessShareLock
Definition: lockdefs.h:36
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
void ReplicationSlotDropAcquired(void)
Definition: slot.c:864
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
Definition: slotsync.c:366
static List * get_local_synced_slots(void)
Definition: slotsync.c:335
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: pg_list.h:54

References AccessShareLock, ereport, errmsg(), foreach_ptr, get_local_synced_slots(), local_sync_slot_required(), LockSharedObject(), LOG, NameStr, ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), SpinLockAcquire, SpinLockRelease, and UnlockSharedObject().

Referenced by synchronize_slots().

◆ get_local_synced_slots()

static List* get_local_synced_slots ( void  )
static

Definition at line 335 of file slotsync.c.

336 {
337  List *local_slots = NIL;
338 
339  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
340 
341  for (int i = 0; i < max_replication_slots; i++)
342  {
344 
345  /* Check if it is a synchronized slot */
346  if (s->in_use && s->data.synced)
347  {
348  Assert(SlotIsLogical(s));
349  local_slots = lappend(local_slots, s);
350  }
351  }
352 
353  LWLockRelease(ReplicationSlotControlLock);
354 
355  return local_slots;
356 }
#define Assert(condition)
Definition: c.h:858
int i
Definition: isn.c:73
List * lappend(List *list, void *datum)
Definition: list.c:339
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115
#define NIL
Definition: pg_list.h:68
int max_replication_slots
Definition: slot.c:141
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
#define SlotIsLogical(slot)
Definition: slot.h:210
ReplicationSlot replication_slots[1]
Definition: slot.h:221
bool in_use
Definition: slot.h:154
ReplicationSlotPersistentData data
Definition: slot.h:178

References Assert, ReplicationSlot::data, i, ReplicationSlot::in_use, lappend(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, NIL, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by drop_local_obsolete_slots().

◆ IsSyncingReplicationSlots()

bool IsSyncingReplicationSlots ( void  )

Definition at line 1569 of file slotsync.c.

1570 {
1571  return syncing_slots;
1572 }
static bool syncing_slots
Definition: slotsync.c:131

References syncing_slots.

Referenced by CreateDecodingContext(), GetStandbyFlushRecPtr(), and ReplicationSlotCreate().

◆ local_sync_slot_required()

static bool local_sync_slot_required ( ReplicationSlot local_slot,
List remote_slots 
)
static

Definition at line 366 of file slotsync.c.

367 {
368  bool remote_exists = false;
369  bool locally_invalidated = false;
370 
371  foreach_ptr(RemoteSlot, remote_slot, remote_slots)
372  {
373  if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
374  {
375  remote_exists = true;
376 
377  /*
378  * If remote slot is not invalidated but local slot is marked as
379  * invalidated, then set locally_invalidated flag.
380  */
381  SpinLockAcquire(&local_slot->mutex);
382  locally_invalidated =
383  (remote_slot->invalidated == RS_INVAL_NONE) &&
384  (local_slot->data.invalidated != RS_INVAL_NONE);
385  SpinLockRelease(&local_slot->mutex);
386 
387  break;
388  }
389  }
390 
391  return (remote_exists && !locally_invalidated);
392 }
@ RS_INVAL_NONE
Definition: slot.h:49
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
slock_t mutex
Definition: slot.h:151

References ReplicationSlot::data, foreach_ptr, ReplicationSlotPersistentData::invalidated, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, RS_INVAL_NONE, SpinLockAcquire, and SpinLockRelease.

Referenced by drop_local_obsolete_slots().

◆ ProcessSlotSyncInterrupts()

static void ProcessSlotSyncInterrupts ( WalReceiverConn wrconn)
static

Definition at line 1177 of file slotsync.c.

1178 {
1180 
1182  {
1183  ereport(LOG,
1184  errmsg("slot sync worker is shutting down on receiving SIGINT"));
1185 
1186  proc_exit(0);
1187  }
1188 
1189  if (ConfigReloadPending)
1191 }
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static void slotsync_reread_config(void)
Definition: slotsync.c:1128

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, proc_exit(), ShutdownRequestPending, and slotsync_reread_config().

Referenced by ReplSlotSyncWorkerMain().

◆ ReplSlotSyncWorkerMain()

void ReplSlotSyncWorkerMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 1252 of file slotsync.c.

1253 {
1254  WalReceiverConn *wrconn = NULL;
1255  char *dbname;
1256  char *err;
1257  sigjmp_buf local_sigjmp_buf;
1258  StringInfoData app_name;
1259 
1260  Assert(startup_data_len == 0);
1261 
1263 
1264  init_ps_display(NULL);
1265 
1267 
1268  /*
1269  * Create a per-backend PGPROC struct in shared memory. We must do this
1270  * before we access any shared memory.
1271  */
1272  InitProcess();
1273 
1274  /*
1275  * Early initialization.
1276  */
1277  BaseInit();
1278 
1279  Assert(SlotSyncCtx != NULL);
1280 
1283 
1284  /*
1285  * Startup process signaled the slot sync worker to stop, so if meanwhile
1286  * postmaster ended up starting the worker again, exit.
1287  */
1289  {
1291  proc_exit(0);
1292  }
1293 
1294  /* Advertise our PID so that the startup process can kill us on promotion */
1297 
1298  ereport(LOG, errmsg("slot sync worker started"));
1299 
1300  /* Register it as soon as SlotSyncCtx->pid is initialized. */
1302 
1303  /* Setup signal handling */
1306  pqsignal(SIGTERM, die);
1312 
1313  /*
1314  * Establishes SIGALRM handler and initialize timeout module. It is needed
1315  * by InitPostgres to register different timeouts.
1316  */
1318 
1319  /* Load the libpq-specific functions */
1320  load_file("libpqwalreceiver", false);
1321 
1322  /*
1323  * If an exception is encountered, processing resumes here.
1324  *
1325  * We just need to clean up, report the error, and go away.
1326  *
1327  * If we do not have this handling here, then since this worker process
1328  * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1329  * Therefore, we create our own exception handler to catch ERRORs.
1330  */
1331  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1332  {
1333  /* since not using PG_TRY, must reset error stack by hand */
1334  error_context_stack = NULL;
1335 
1336  /* Prevents interrupts while cleaning up */
1337  HOLD_INTERRUPTS();
1338 
1339  /* Report the error to the server log */
1340  EmitErrorReport();
1341 
1342  /*
1343  * We can now go away. Note that because we called InitProcess, a
1344  * callback was registered to do ProcKill, which will clean up
1345  * necessary state.
1346  */
1347  proc_exit(0);
1348  }
1349 
1350  /* We can now handle ereport(ERROR) */
1351  PG_exception_stack = &local_sigjmp_buf;
1352 
1353  /*
1354  * Unblock signals (they were blocked when the postmaster forked us)
1355  */
1356  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
1357 
1358  /*
1359  * Set always-secure search path, so malicious users can't redirect user
1360  * code (e.g. operators).
1361  *
1362  * It's not strictly necessary since we won't be scanning or writing to
1363  * any user table locally, but it's good to retain it here for added
1364  * precaution.
1365  */
1366  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1367 
1369 
1370  /*
1371  * Connect to the database specified by the user in primary_conninfo. We
1372  * need a database connection for walrcv_exec to work which we use to
1373  * fetch slot information from the remote node. See comments atop
1374  * libpqrcv_exec.
1375  *
1376  * We do not specify a specific user here since the slot sync worker will
1377  * operate as a superuser. This is safe because the slot sync worker does
1378  * not interact with user tables, eliminating the risk of executing
1379  * arbitrary code within triggers.
1380  */
1381  InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
1382 
1384 
1385  initStringInfo(&app_name);
1386  if (cluster_name[0])
1387  appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
1388  else
1389  appendStringInfoString(&app_name, "slotsync worker");
1390 
1391  /*
1392  * Establish the connection to the primary server for slot
1393  * synchronization.
1394  */
1395  wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
1396  app_name.data, &err);
1397  pfree(app_name.data);
1398 
1399  if (!wrconn)
1400  ereport(ERROR,
1401  errcode(ERRCODE_CONNECTION_FAILURE),
1402  errmsg("could not connect to the primary server: %s", err));
1403 
1404  /*
1405  * Register the failure callback once we have the connection.
1406  *
1407  * XXX: This can be combined with previous such cleanup registration of
1408  * slotsync_worker_onexit() but that will need the connection to be made
1409  * global and we want to avoid introducing global for this purpose.
1410  */
1412 
1413  /*
1414  * Using the specified primary server connection, check that we are not a
1415  * cascading standby and slot configured in 'primary_slot_name' exists on
1416  * the primary server.
1417  */
1419 
1420  /* Main loop to synchronize slots */
1421  for (;;)
1422  {
1423  bool some_slot_updated = false;
1424 
1426 
1427  some_slot_updated = synchronize_slots(wrconn);
1428 
1429  wait_for_slot_activity(some_slot_updated);
1430  }
1431 
1432  /*
1433  * The slot sync worker can't get here because it will only stop when it
1434  * receives a SIGINT from the startup process, or when there is an error.
1435  */
1436  Assert(false);
1437 }
sigset_t UnBlockSig
Definition: pqsignal.c:22
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void EmitErrorReport(void)
Definition: elog.c:1672
ErrorContextCallback * error_context_stack
Definition: elog.c:94
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
void err(int eval, const char *fmt,...)
Definition: err.c:43
int MyProcPid
Definition: globals.c:45
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4275
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
char * cluster_name
Definition: guc_tables.c:540
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void pfree(void *pointer)
Definition: mcxt.c:1520
@ NormalProcessing
Definition: miscadmin.h:449
@ InitProcessing
Definition: miscadmin.h:448
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
#define SetProcessingMode(mode)
Definition: miscadmin.h:460
@ B_SLOTSYNC_WORKER
Definition: miscadmin.h:343
#define InvalidPid
Definition: miscadmin.h:32
BackendType MyBackendType
Definition: miscinit.c:63
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void FloatExceptionHandler(SIGNAL_ARGS)
Definition: postgres.c:3019
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
void BaseInit(void)
Definition: postinit.c:645
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
Definition: postinit.c:736
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void init_ps_display(const char *fixed_part)
Definition: ps_status.c:267
SlotSyncCtxStruct * SlotSyncCtx
Definition: slotsync.c:108
static void slotsync_failure_callback(int code, Datum arg)
Definition: slotsync.c:1607
static bool synchronize_slots(WalReceiverConn *wrconn)
Definition: slotsync.c:793
static void wait_for_slot_activity(bool some_slot_updated)
Definition: slotsync.c:1215
char * CheckAndGetDbnameFromConninfo(void)
Definition: slotsync.c:1035
static void slotsync_worker_onexit(int code, Datum arg)
Definition: slotsync.c:1199
static void validate_remote_info(WalReceiverConn *wrconn)
Definition: slotsync.c:956
static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
Definition: slotsync.c:1177
void InitProcess(void)
Definition: proc.c:296
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void InitializeTimeouts(void)
Definition: timeout.c:470
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References appendStringInfo(), appendStringInfoString(), Assert, B_SLOTSYNC_WORKER, BaseInit(), before_shmem_exit(), CheckAndGetDbnameFromConninfo(), cluster_name, StringInfoData::data, dbname, die, EmitErrorReport(), ereport, err(), errcode(), errmsg(), ERROR, error_context_stack, FloatExceptionHandler(), HOLD_INTERRUPTS, init_ps_display(), InitializeTimeouts(), InitPostgres(), InitProcess(), InitProcessing, initStringInfo(), InvalidOid, InvalidPid, load_file(), LOG, SlotSyncCtxStruct::mutex, MyBackendType, MyProcPid, NormalProcessing, pfree(), PG_exception_stack, PGC_S_OVERRIDE, PGC_SUSET, SlotSyncCtxStruct::pid, PointerGetDatum(), pqsignal(), PrimaryConnInfo, proc_exit(), ProcessSlotSyncInterrupts(), procsignal_sigusr1_handler(), SetConfigOption(), SetProcessingMode, SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, slotsync_failure_callback(), slotsync_worker_onexit(), SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::stopSignaled, synchronize_slots(), UnBlockSig, validate_remote_info(), wait_for_slot_activity(), walrcv_connect, and wrconn.

◆ reserve_wal_for_local_slot()

static void reserve_wal_for_local_slot ( XLogRecPtr  restart_lsn)
static

Definition at line 476 of file slotsync.c.

477 {
478  XLogSegNo oldest_segno;
479  XLogSegNo segno;
481 
482  Assert(slot != NULL);
484 
485  while (true)
486  {
487  SpinLockAcquire(&slot->mutex);
488  slot->data.restart_lsn = restart_lsn;
489  SpinLockRelease(&slot->mutex);
490 
491  /* Prevent WAL removal as fast as possible */
493 
495 
496  /*
497  * Find the oldest existing WAL segment file.
498  *
499  * Normally, we can determine it by using the last removed segment
500  * number. However, if no WAL segment files have been removed by a
501  * checkpoint since startup, we need to search for the oldest segment
502  * file from the current timeline existing in XLOGDIR.
503  *
504  * XXX: Currently, we are searching for the oldest segment in the
505  * current timeline as there is less chance of the slot's restart_lsn
506  * from being some prior timeline, and even if it happens, in the
507  * worst case, we will wait to sync till the slot's restart_lsn moved
508  * to the current timeline.
509  */
510  oldest_segno = XLogGetLastRemovedSegno() + 1;
511 
512  if (oldest_segno == 1)
513  {
514  TimeLineID cur_timeline;
515 
516  GetWalRcvFlushRecPtr(NULL, &cur_timeline);
517  oldest_segno = XLogGetOldestSegno(cur_timeline);
518  }
519 
520  elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
521  segno, oldest_segno);
522 
523  /*
524  * If all required WAL is still there, great, otherwise retry. The
525  * slot should prevent further removal of WAL, unless there's a
526  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
527  * the new restart_lsn above, so normally we should never need to loop
528  * more than twice.
529  */
530  if (segno >= oldest_segno)
531  break;
532 
533  /* Retry using the location of the oldest wal segment */
534  XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
535  }
536 }
#define UINT64_FORMAT
Definition: c.h:549
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1101
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3747
int wal_segment_size
Definition: xlog.c:143
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3763
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReplicationSlot::data, DEBUG1, elog, GetWalRcvFlushRecPtr(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, UINT64_FORMAT, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogGetOldestSegno(), XLogRecPtrIsInvalid, and XLogSegNoOffsetToRecPtr.

Referenced by synchronize_one_slot().

◆ ShutDownSlotSync()

void ShutDownSlotSync ( void  )

Definition at line 1491 of file slotsync.c.

1492 {
1494 
1495  SlotSyncCtx->stopSignaled = true;
1496 
1497  if (SlotSyncCtx->pid == InvalidPid)
1498  {
1501  return;
1502  }
1504 
1505  kill(SlotSyncCtx->pid, SIGINT);
1506 
1507  /* Wait for it to die */
1508  for (;;)
1509  {
1510  int rc;
1511 
1512  /* Wait a bit, we don't expect to have to wait long */
1513  rc = WaitLatch(MyLatch,
1515  10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1516 
1517  if (rc & WL_LATCH_SET)
1518  {
1521  }
1522 
1524 
1525  /* Is it gone? */
1526  if (SlotSyncCtx->pid == InvalidPid)
1527  break;
1528 
1530  }
1531 
1533 
1535 }
struct Latch * MyLatch
Definition: globals.c:60
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static void update_synced_slots_inactive_since(void)
Definition: slotsync.c:1446
#define kill(pid, sig)
Definition: win32_port.h:485

References CHECK_FOR_INTERRUPTS, InvalidPid, kill, SlotSyncCtxStruct::mutex, MyLatch, SlotSyncCtxStruct::pid, ResetLatch(), SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::stopSignaled, update_synced_slots_inactive_since(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by FinishWalRecovery().

◆ slotsync_failure_callback()

static void slotsync_failure_callback ( int  code,
Datum  arg 
)
static

Definition at line 1607 of file slotsync.c.

1608 {
1610 
1611  if (syncing_slots)
1612  {
1613  /*
1614  * If syncing_slots is true, it indicates that the process errored out
1615  * without resetting the flag. So, we need to clean up shared memory
1616  * and reset the flag here.
1617  */
1619  SlotSyncCtx->syncing = false;
1621 
1622  syncing_slots = false;
1623  }
1624 
1626 }
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define walrcv_disconnect(conn)
Definition: walreceiver.h:464

References arg, DatumGetPointer(), SlotSyncCtxStruct::mutex, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::syncing, syncing_slots, walrcv_disconnect, and wrconn.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ slotsync_reread_config()

static void slotsync_reread_config ( void  )
static

Definition at line 1128 of file slotsync.c.

1129 {
1130  char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
1131  char *old_primary_slotname = pstrdup(PrimarySlotName);
1132  bool old_sync_replication_slots = sync_replication_slots;
1133  bool old_hot_standby_feedback = hot_standby_feedback;
1134  bool conninfo_changed;
1135  bool primary_slotname_changed;
1136 
1138 
1139  ConfigReloadPending = false;
1141 
1142  conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
1143  primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
1144  pfree(old_primary_conninfo);
1145  pfree(old_primary_slotname);
1146 
1147  if (old_sync_replication_slots != sync_replication_slots)
1148  {
1149  ereport(LOG,
1150  /* translator: %s is a GUC variable name */
1151  errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
1152  proc_exit(0);
1153  }
1154 
1155  if (conninfo_changed ||
1156  primary_slotname_changed ||
1157  (old_hot_standby_feedback != hot_standby_feedback))
1158  {
1159  ereport(LOG,
1160  errmsg("slot sync worker will restart because of a parameter change"));
1161 
1162  /*
1163  * Reset the last-start time for this worker so that the postmaster
1164  * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
1165  */
1167 
1168  proc_exit(0);
1169  }
1170 
1171 }
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * pstrdup(const char *in)
Definition: mcxt.c:1695
bool sync_replication_slots
Definition: slotsync.c:111
time_t last_start_time
Definition: slotsync.c:104
bool hot_standby_feedback
Definition: walreceiver.c:89
char * PrimarySlotName
Definition: xlogrecovery.c:97

References Assert, ConfigReloadPending, ereport, errmsg(), hot_standby_feedback, SlotSyncCtxStruct::last_start_time, LOG, pfree(), PGC_SIGHUP, PrimaryConnInfo, PrimarySlotName, proc_exit(), ProcessConfigFile(), pstrdup(), SlotSyncCtx, and sync_replication_slots.

Referenced by ProcessSlotSyncInterrupts().

◆ slotsync_worker_onexit()

static void slotsync_worker_onexit ( int  code,
Datum  arg 
)
static

◆ SlotSyncShmemInit()

void SlotSyncShmemInit ( void  )

Definition at line 1587 of file slotsync.c.

1588 {
1590  bool found;
1591 
1593  ShmemInitStruct("Slot Sync Data", size, &found);
1594 
1595  if (!found)
1596  {
1597  memset(SlotSyncCtx, 0, size);
1600  }
1601 }
size_t Size
Definition: c.h:605
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static pg_noinline void Size size
Definition: slab.c:607
Size SlotSyncShmemSize(void)
Definition: slotsync.c:1578
#define SpinLockInit(lock)
Definition: spin.h:60

References InvalidPid, SlotSyncCtxStruct::mutex, SlotSyncCtxStruct::pid, ShmemInitStruct(), size, SlotSyncCtx, SlotSyncShmemSize(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SlotSyncShmemSize()

Size SlotSyncShmemSize ( void  )

Definition at line 1578 of file slotsync.c.

1579 {
1580  return sizeof(SlotSyncCtxStruct);
1581 }
struct SlotSyncCtxStruct SlotSyncCtxStruct

Referenced by CalculateShmemSize(), and SlotSyncShmemInit().

◆ SlotSyncWorkerCanRestart()

bool SlotSyncWorkerCanRestart ( void  )

Definition at line 1549 of file slotsync.c.

1550 {
1551  time_t curtime = time(NULL);
1552 
1553  /* Return false if too soon since last start. */
1554  if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
1555  (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
1556  return false;
1557 
1558  SlotSyncCtx->last_start_time = curtime;
1559 
1560  return true;
1561 }
#define SLOTSYNC_RESTART_INTERVAL_SEC
Definition: slotsync.c:124

References SlotSyncCtxStruct::last_start_time, SLOTSYNC_RESTART_INTERVAL_SEC, and SlotSyncCtx.

Referenced by MaybeStartSlotSyncWorker().

◆ synchronize_one_slot()

static bool synchronize_one_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 611 of file slotsync.c.

612 {
613  ReplicationSlot *slot;
614  XLogRecPtr latestFlushPtr;
615  bool slot_updated = false;
616 
617  /*
618  * Make sure that concerned WAL is received and flushed before syncing
619  * slot to target lsn received from the primary server.
620  */
621  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
622  if (remote_slot->confirmed_lsn > latestFlushPtr)
623  {
624  /*
625  * Can get here only if GUC 'standby_slot_names' on the primary server
626  * was not configured correctly.
627  */
629  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
630  errmsg("skipping slot synchronization as the received slot sync"
631  " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
632  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
633  remote_slot->name,
634  LSN_FORMAT_ARGS(latestFlushPtr)));
635 
636  return false;
637  }
638 
639  /* Search for the named slot */
640  if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
641  {
642  bool synced;
643 
644  SpinLockAcquire(&slot->mutex);
645  synced = slot->data.synced;
646  SpinLockRelease(&slot->mutex);
647 
648  /* User-created slot with the same name exists, raise ERROR. */
649  if (!synced)
650  ereport(ERROR,
651  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652  errmsg("exiting from slot synchronization because same"
653  " name slot \"%s\" already exists on the standby",
654  remote_slot->name));
655 
656  /*
657  * The slot has been synchronized before.
658  *
659  * It is important to acquire the slot here before checking
660  * invalidation. If we don't acquire the slot first, there could be a
661  * race condition that the local slot could be invalidated just after
662  * checking the 'invalidated' flag here and we could end up
663  * overwriting 'invalidated' flag to remote_slot's value. See
664  * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
665  * if the slot is not acquired by other processes.
666  *
667  * XXX: If it ever turns out that slot acquire/release is costly for
668  * cases when none of the slot properties is changed then we can do a
669  * pre-check to ensure that at least one of the slot properties is
670  * changed before acquiring the slot.
671  */
672  ReplicationSlotAcquire(remote_slot->name, true);
673 
674  Assert(slot == MyReplicationSlot);
675 
676  /*
677  * Copy the invalidation cause from remote only if local slot is not
678  * invalidated locally, we don't want to overwrite existing one.
679  */
680  if (slot->data.invalidated == RS_INVAL_NONE &&
681  remote_slot->invalidated != RS_INVAL_NONE)
682  {
683  SpinLockAcquire(&slot->mutex);
684  slot->data.invalidated = remote_slot->invalidated;
685  SpinLockRelease(&slot->mutex);
686 
687  /* Make sure the invalidated state persists across server restart */
690 
691  slot_updated = true;
692  }
693 
694  /* Skip the sync of an invalidated slot */
695  if (slot->data.invalidated != RS_INVAL_NONE)
696  {
698  return slot_updated;
699  }
700 
701  /* Slot not ready yet, let's attempt to make it sync-ready now. */
702  if (slot->data.persistency == RS_TEMPORARY)
703  {
704  slot_updated = update_and_persist_local_synced_slot(remote_slot,
705  remote_dbid);
706  }
707 
708  /* Slot ready for sync, so sync it. */
709  else
710  {
711  /*
712  * Sanity check: As long as the invalidations are handled
713  * appropriately as above, this should never happen.
714  *
715  * We don't need to check restart_lsn here. See the comments in
716  * update_local_synced_slot() for details.
717  */
718  if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
719  ereport(ERROR,
720  errmsg_internal("cannot synchronize local slot \"%s\"",
721  remote_slot->name),
722  errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
724  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
725 
726  slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
727  NULL, NULL);
728  }
729  }
730  /* Otherwise create the slot first. */
731  else
732  {
733  NameData plugin_name;
734  TransactionId xmin_horizon = InvalidTransactionId;
735 
736  /* Skip creating the local slot if remote_slot is invalidated already */
737  if (remote_slot->invalidated != RS_INVAL_NONE)
738  return false;
739 
740  /*
741  * We create temporary slots instead of ephemeral slots here because
742  * we want the slots to survive after releasing them. This is done to
743  * avoid dropping and re-creating the slots in each synchronization
744  * cycle if the restart_lsn or catalog_xmin of the remote slot has not
745  * caught up.
746  */
747  ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
748  remote_slot->two_phase,
749  remote_slot->failover,
750  true);
751 
752  /* For shorter lines. */
753  slot = MyReplicationSlot;
754 
755  /* Avoid expensive operations while holding a spinlock. */
756  namestrcpy(&plugin_name, remote_slot->plugin);
757 
758  SpinLockAcquire(&slot->mutex);
759  slot->data.database = remote_dbid;
760  slot->data.plugin = plugin_name;
761  SpinLockRelease(&slot->mutex);
762 
764 
765  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
766  xmin_horizon = GetOldestSafeDecodingTransactionId(true);
767  SpinLockAcquire(&slot->mutex);
768  slot->effective_catalog_xmin = xmin_horizon;
769  slot->data.catalog_xmin = xmin_horizon;
770  SpinLockRelease(&slot->mutex);
772  LWLockRelease(ProcArrayLock);
773 
774  update_and_persist_local_synced_slot(remote_slot, remote_dbid);
775 
776  slot_updated = true;
777  }
778 
780 
781  return slot_updated;
782 }
uint32 TransactionId
Definition: c.h:652
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1232
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define AmLogicalSlotSyncWorkerProcess()
Definition: miscadmin.h:378
void namestrcpy(Name name, const char *str)
Definition: name.c:233
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2932
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1006
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1045
void ReplicationSlotSave(void)
Definition: slot.c:988
void ReplicationSlotRelease(void)
Definition: slot.c:652
@ RS_TEMPORARY
Definition: slot.h:37
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
Definition: slotsync.c:476
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:547
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
Definition: slotsync.c:170
bool two_phase
Definition: slotsync.c:142
char * plugin
Definition: slotsync.c:140
char * name
Definition: slotsync.c:139
bool failover
Definition: slotsync.c:143
ReplicationSlotInvalidationCause invalidated
Definition: slotsync.c:149
XLogRecPtr confirmed_lsn
Definition: slotsync.c:145
XLogRecPtr restart_lsn
Definition: slotsync.c:144
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistency persistency
Definition: slot.h:74
TransactionId effective_catalog_xmin
Definition: slot.h:175
Definition: c.h:741
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3504
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AmLogicalSlotSyncWorkerProcess, Assert, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ereport, errcode(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, GetOldestSafeDecodingTransactionId(), GetStandbyFlushRecPtr(), RemoteSlot::invalidated, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LOG, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotAcquire(), ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredXmin(), reserve_wal_for_local_slot(), RemoteSlot::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SearchNamedReplicationSlot(), SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, RemoteSlot::two_phase, update_and_persist_local_synced_slot(), and update_local_synced_slot().

Referenced by synchronize_slots().

◆ synchronize_slots()

static bool synchronize_slots ( WalReceiverConn wrconn)
static

Definition at line 793 of file slotsync.c.

794 {
795 #define SLOTSYNC_COLUMN_COUNT 9
796  Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
797  LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
798 
800  TupleTableSlot *tupslot;
801  List *remote_slot_list = NIL;
802  bool some_slot_updated = false;
803  bool started_tx = false;
804  const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
805  " restart_lsn, catalog_xmin, two_phase, failover,"
806  " database, invalidation_reason"
807  " FROM pg_catalog.pg_replication_slots"
808  " WHERE failover and NOT temporary";
809 
811  if (SlotSyncCtx->syncing)
812  {
814  ereport(ERROR,
815  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
816  errmsg("cannot synchronize replication slots concurrently"));
817  }
818 
819  SlotSyncCtx->syncing = true;
821 
822  syncing_slots = true;
823 
824  /* The syscache access in walrcv_exec() needs a transaction env. */
825  if (!IsTransactionState())
826  {
828  started_tx = true;
829  }
830 
831  /* Execute the query */
832  res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
833  if (res->status != WALRCV_OK_TUPLES)
834  ereport(ERROR,
835  errmsg("could not fetch failover logical slots info from the primary server: %s",
836  res->err));
837 
838  /* Construct the remote_slot tuple and synchronize each slot locally */
839  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
840  while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
841  {
842  bool isnull;
843  RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
844  Datum d;
845  int col = 0;
846 
847  remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
848  &isnull));
849  Assert(!isnull);
850 
851  remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
852  &isnull));
853  Assert(!isnull);
854 
855  /*
856  * It is possible to get null values for LSN and Xmin if slot is
857  * invalidated on the primary server, so handle accordingly.
858  */
859  d = slot_getattr(tupslot, ++col, &isnull);
860  remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
861  DatumGetLSN(d);
862 
863  d = slot_getattr(tupslot, ++col, &isnull);
864  remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
865 
866  d = slot_getattr(tupslot, ++col, &isnull);
867  remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
869 
870  remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
871  &isnull));
872  Assert(!isnull);
873 
874  remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
875  &isnull));
876  Assert(!isnull);
877 
878  remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
879  ++col, &isnull));
880  Assert(!isnull);
881 
882  d = slot_getattr(tupslot, ++col, &isnull);
883  remote_slot->invalidated = isnull ? RS_INVAL_NONE :
885 
886  /* Sanity check */
888 
889  /*
890  * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
891  * slot is valid, that means we have fetched the remote_slot in its
892  * RS_EPHEMERAL state. In such a case, don't sync it; we can always
893  * sync it in the next sync cycle when the remote_slot is persisted
894  * and has valid lsn(s) and xmin values.
895  *
896  * XXX: In future, if we plan to expose 'slot->data.persistency' in
897  * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
898  * slots in the first place.
899  */
900  if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
901  XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
902  !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
903  remote_slot->invalidated == RS_INVAL_NONE)
904  pfree(remote_slot);
905  else
906  /* Create list of remote slots */
907  remote_slot_list = lappend(remote_slot_list, remote_slot);
908 
909  ExecClearTuple(tupslot);
910  }
911 
912  /* Drop local slots that no longer need to be synced. */
913  drop_local_obsolete_slots(remote_slot_list);
914 
915  /* Now sync the slots locally */
916  foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
917  {
918  Oid remote_dbid = get_database_oid(remote_slot->database, false);
919 
920  /*
921  * Use shared lock to prevent a conflict with
922  * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
923  * a drop-database operation.
924  */
925  LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
926 
927  some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
928 
929  UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
930  }
931 
932  /* We are done, free remote_slot_list elements */
933  list_free_deep(remote_slot_list);
934 
936 
937  if (started_tx)
939 
941  SlotSyncCtx->syncing = false;
943 
944  syncing_slots = false;
945 
946  return some_slot_updated;
947 }
#define TextDatumGetCString(d)
Definition: builtins.h:98
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3106
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
void list_free_deep(List *list)
Definition: list.c:1560
void * palloc0(Size size)
Definition: mcxt.c:1346
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:31
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2390
static void drop_local_obsolete_slots(List *remote_slot_list)
Definition: slotsync.c:419
#define SLOTSYNC_COLUMN_COUNT
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:611
char * database
Definition: slotsync.c:141
TransactionId catalog_xmin
Definition: slotsync.c:146
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:468
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:462
bool IsTransactionState(void)
Definition: xact.c:384
void StartTransactionCommand(void)
Definition: xact.c:2995
void CommitTransactionCommand(void)
Definition: xact.c:3093
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, Assert, RemoteSlot::catalog_xmin, CommitTransactionCommand(), RemoteSlot::confirmed_lsn, RemoteSlot::database, DatumGetBool(), DatumGetLSN(), DatumGetTransactionId(), drop_local_obsolete_slots(), ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), RemoteSlot::failover, foreach_ptr, get_database_oid(), GetSlotInvalidationCause(), RemoteSlot::invalidated, InvalidTransactionId, InvalidXLogRecPtr, IsTransactionState(), lappend(), list_free_deep(), LockSharedObject(), MakeSingleTupleTableSlot(), SlotSyncCtxStruct::mutex, RemoteSlot::name, NIL, palloc0(), pfree(), RemoteSlot::plugin, res, RemoteSlot::restart_lsn, RS_INVAL_NONE, slot_getattr(), SLOTSYNC_COLUMN_COUNT, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), synchronize_one_slot(), SlotSyncCtxStruct::syncing, syncing_slots, TextDatumGetCString, TransactionIdIsValid, TTSOpsMinimalTuple, tuplestore_gettupleslot(), RemoteSlot::two_phase, UnlockSharedObject(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, wrconn, and XLogRecPtrIsInvalid.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ SyncReplicationSlots()

void SyncReplicationSlots ( WalReceiverConn wrconn)

Definition at line 1633 of file slotsync.c.

1634 {
1636  {
1638 
1640  }
1642 }
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52

References PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, PointerGetDatum(), slotsync_failure_callback(), synchronize_slots(), validate_remote_info(), and wrconn.

Referenced by pg_sync_replication_slots().

◆ update_and_persist_local_synced_slot()

static bool update_and_persist_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 547 of file slotsync.c.

548 {
550  bool found_consistent_snapshot = false;
551  bool remote_slot_precedes = false;
552 
553  (void) update_local_synced_slot(remote_slot, remote_dbid,
554  &found_consistent_snapshot,
555  &remote_slot_precedes);
556 
557  /*
558  * Check if the primary server has caught up. Refer to the comment atop
559  * the file for details on this check.
560  */
561  if (remote_slot_precedes)
562  {
563  /*
564  * The remote slot didn't catch up to locally reserved position.
565  *
566  * We do not drop the slot because the restart_lsn can be ahead of the
567  * current location when recreating the slot in the next cycle. It may
568  * take more time to create such a slot. Therefore, we keep this slot
569  * and attempt the synchronization in the next cycle.
570  */
571  return false;
572  }
573 
574  /*
575  * Don't persist the slot if it cannot reach the consistent point from the
576  * restart_lsn. See comments atop this file.
577  */
578  if (!found_consistent_snapshot)
579  {
580  ereport(LOG,
581  errmsg("could not sync slot \"%s\"", remote_slot->name),
582  errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
584 
585  return false;
586  }
587 
589 
590  ereport(LOG,
591  errmsg("newly created slot \"%s\" is sync-ready now",
592  remote_slot->name));
593 
594  return true;
595 }
int errdetail(const char *fmt,...)
Definition: elog.c:1205
void ReplicationSlotPersist(void)
Definition: slot.c:1023

References ReplicationSlot::data, ereport, errdetail(), errmsg(), LOG, LSN_FORMAT_ARGS, MyReplicationSlot, RemoteSlot::name, ReplicationSlotPersist(), ReplicationSlotPersistentData::restart_lsn, and update_local_synced_slot().

Referenced by synchronize_one_slot().

◆ update_local_synced_slot()

static bool update_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid,
bool found_consistent_snapshot,
bool remote_slot_precedes 
)
static

Definition at line 170 of file slotsync.c.

173 {
175  bool updated_xmin_or_lsn = false;
176  bool updated_config = false;
177 
179 
180  if (found_consistent_snapshot)
181  *found_consistent_snapshot = false;
182 
183  if (remote_slot_precedes)
184  *remote_slot_precedes = false;
185 
186  /*
187  * Don't overwrite if we already have a newer catalog_xmin and
188  * restart_lsn.
189  */
190  if (remote_slot->restart_lsn < slot->data.restart_lsn ||
191  TransactionIdPrecedes(remote_slot->catalog_xmin,
192  slot->data.catalog_xmin))
193  {
194  /*
195  * This can happen in following situations:
196  *
197  * If the slot is temporary, it means either the initial WAL location
198  * reserved for the local slot is ahead of the remote slot's
199  * restart_lsn or the initial xmin_horizon computed for the local slot
200  * is ahead of the remote slot.
201  *
202  * If the slot is persistent, restart_lsn of the synced slot could
203  * still be ahead of the remote slot. Since we use slot advance
204  * functionality to keep snapbuild/slot updated, it is possible that
205  * the restart_lsn is advanced to a later position than it has on the
206  * primary. This can happen when slot advancing machinery finds
207  * running xacts record after reaching the consistent state at a later
208  * point than the primary where it serializes the snapshot and updates
209  * the restart_lsn.
210  *
211  * We LOG the message if the slot is temporary as it can help the user
212  * to understand why the slot is not sync-ready. In the case of a
213  * persistent slot, it would be a more common case and won't directly
214  * impact the users, so we used DEBUG1 level to log the message.
215  */
217  errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
218  remote_slot->name),
219  errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
220  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
221  remote_slot->catalog_xmin,
223  slot->data.catalog_xmin));
224 
225  if (remote_slot_precedes)
226  *remote_slot_precedes = true;
227  }
228 
229  /*
230  * Attempt to sync LSNs and xmins only if remote slot is ahead of local
231  * slot.
232  */
233  else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
234  remote_slot->restart_lsn > slot->data.restart_lsn ||
235  TransactionIdFollows(remote_slot->catalog_xmin,
236  slot->data.catalog_xmin))
237  {
238  /*
239  * We can't directly copy the remote slot's LSN or xmin unless there
240  * exists a consistent snapshot at that point. Otherwise, after
241  * promotion, the slots may not reach a consistent point before the
242  * confirmed_flush_lsn which can lead to a data loss. To avoid data
243  * loss, we let slot machinery advance the slot which ensures that
244  * snapbuilder/slot statuses are updated properly.
245  */
246  if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
247  {
248  /*
249  * Update the slot info directly if there is a serialized snapshot
250  * at the restart_lsn, as the slot can quickly reach consistency
251  * at restart_lsn by restoring the snapshot.
252  */
253  SpinLockAcquire(&slot->mutex);
254  slot->data.restart_lsn = remote_slot->restart_lsn;
255  slot->data.confirmed_flush = remote_slot->confirmed_lsn;
256  slot->data.catalog_xmin = remote_slot->catalog_xmin;
257  SpinLockRelease(&slot->mutex);
258 
259  if (found_consistent_snapshot)
260  *found_consistent_snapshot = true;
261  }
262  else
263  {
265  found_consistent_snapshot);
266 
267  /* Sanity check */
268  if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
269  ereport(ERROR,
270  errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
271  remote_slot->name),
272  errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
273  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
275  }
276 
277  updated_xmin_or_lsn = true;
278  }
279 
280  if (remote_dbid != slot->data.database ||
281  remote_slot->two_phase != slot->data.two_phase ||
282  remote_slot->failover != slot->data.failover ||
283  strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
284  {
285  NameData plugin_name;
286 
287  /* Avoid expensive operations while holding a spinlock. */
288  namestrcpy(&plugin_name, remote_slot->plugin);
289 
290  SpinLockAcquire(&slot->mutex);
291  slot->data.plugin = plugin_name;
292  slot->data.database = remote_dbid;
293  slot->data.two_phase = remote_slot->two_phase;
294  slot->data.failover = remote_slot->failover;
295  SpinLockRelease(&slot->mutex);
296 
297  updated_config = true;
298  }
299 
300  /*
301  * We have to write the changed xmin to disk *before* we change the
302  * in-memory value, otherwise after a crash we wouldn't know that some
303  * catalog tuples might have been removed already.
304  */
305  if (updated_config || updated_xmin_or_lsn)
306  {
309  }
310 
311  /*
312  * Now the new xmin is safely on disk, we can let the global value
313  * advance. We do not take ProcArrayLock or similar since we only advance
314  * xmin here and there's not much harm done by a concurrent computation
315  * missing that.
316  */
317  if (updated_xmin_or_lsn)
318  {
319  SpinLockAcquire(&slot->mutex);
320  slot->effective_catalog_xmin = remote_slot->catalog_xmin;
321  SpinLockRelease(&slot->mutex);
322 
325  }
326 
327  return updated_config || updated_xmin_or_lsn;
328 }
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
Definition: logical.c:2060
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
Definition: snapbuild.c:2142
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:314

References Assert, RemoteSlot::catalog_xmin, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ereport, errdetail(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, ReplicationSlotPersistentData::failover, ReplicationSlotPersistentData::invalidated, LOG, LogicalSlotAdvanceAndCheckSnapState(), LSN_FORMAT_ARGS, ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotMarkDirty(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RemoteSlot::restart_lsn, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SnapBuildSnapshotExists(), SpinLockAcquire, SpinLockRelease, TransactionIdFollows(), TransactionIdPrecedes(), RemoteSlot::two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by synchronize_one_slot(), and update_and_persist_local_synced_slot().

◆ update_synced_slots_inactive_since()

static void update_synced_slots_inactive_since ( void  )
static

Definition at line 1446 of file slotsync.c.

1447 {
1448  TimestampTz now = 0;
1449 
1450  /*
1451  * We need to update inactive_since only when we are promoting standby to
1452  * correctly interpret the inactive_since if the standby gets promoted
1453  * without a restart. We don't want the slots to appear inactive for a
1454  * long time after promotion if they haven't been synchronized recently.
1455  * Whoever acquires the slot i.e.makes the slot active will reset it.
1456  */
1457  if (!StandbyMode)
1458  return;
1459 
1460  /* The slot sync worker mustn't be running by now */
1462 
1463  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1464 
1465  for (int i = 0; i < max_replication_slots; i++)
1466  {
1468 
1469  /* Check if it is a synchronized slot */
1470  if (s->in_use && s->data.synced)
1471  {
1472  Assert(SlotIsLogical(s));
1473 
1474  /* Use the same inactive_since time for all the slots. */
1475  if (now == 0)
1477 
1478  SpinLockAcquire(&s->mutex);
1479  s->inactive_since = now;
1480  SpinLockRelease(&s->mutex);
1481  }
1482  }
1483 
1484  LWLockRelease(ReplicationSlotControlLock);
1485 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
int64 TimestampTz
Definition: timestamp.h:39
TimestampTz inactive_since
Definition: slot.h:206
bool StandbyMode
Definition: xlogrecovery.c:147

References Assert, ReplicationSlot::data, GetCurrentTimestamp(), i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidPid, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, now(), SlotSyncCtxStruct::pid, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, StandbyMode, and ReplicationSlotPersistentData::synced.

Referenced by ShutDownSlotSync().

◆ validate_remote_info()

static void validate_remote_info ( WalReceiverConn wrconn)
static

Definition at line 956 of file slotsync.c.

957 {
958 #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
960  Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
961  StringInfoData cmd;
962  bool isnull;
963  TupleTableSlot *tupslot;
964  bool remote_in_recovery;
965  bool primary_slot_valid;
966  bool started_tx = false;
967 
968  initStringInfo(&cmd);
969  appendStringInfo(&cmd,
970  "SELECT pg_is_in_recovery(), count(*) = 1"
971  " FROM pg_catalog.pg_replication_slots"
972  " WHERE slot_type='physical' AND slot_name=%s",
974 
975  /* The syscache access in walrcv_exec() needs a transaction env. */
976  if (!IsTransactionState())
977  {
979  started_tx = true;
980  }
981 
983  pfree(cmd.data);
984 
985  if (res->status != WALRCV_OK_TUPLES)
986  ereport(ERROR,
987  errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
988  PrimarySlotName, res->err),
989  errhint("Check if primary_slot_name is configured correctly."));
990 
991  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
992  if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
993  elog(ERROR,
994  "failed to fetch tuple for the primary server slot specified by primary_slot_name");
995 
996  remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
997  Assert(!isnull);
998 
999  /*
1000  * Slot sync is currently not supported on a cascading standby. This is
1001  * because if we allow it, the primary server needs to wait for all the
1002  * cascading standbys, otherwise, logical subscribers can still be ahead
1003  * of one of the cascading standbys which we plan to promote. Thus, to
1004  * avoid this additional complexity, we restrict it for the time being.
1005  */
1006  if (remote_in_recovery)
1007  ereport(ERROR,
1008  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1009  errmsg("cannot synchronize replication slots from a standby server"));
1010 
1011  primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
1012  Assert(!isnull);
1013 
1014  if (!primary_slot_valid)
1015  ereport(ERROR,
1016  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1017  errmsg("slot synchronization requires valid primary_slot_name"),
1018  /* translator: second %s is a GUC variable name */
1019  errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.",
1020  PrimarySlotName, "primary_slot_name"));
1021 
1022  ExecClearTuple(tupslot);
1024 
1025  if (started_tx)
1027 }
int errhint(const char *fmt,...)
Definition: elog.c:1319
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define PRIMARY_INFO_OUTPUT_COL_COUNT

References appendStringInfo(), Assert, CommitTransactionCommand(), StringInfoData::data, DatumGetBool(), elog, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, ExecClearTuple(), initStringInfo(), IsTransactionState(), MakeSingleTupleTableSlot(), pfree(), PRIMARY_INFO_OUTPUT_COL_COUNT, PrimarySlotName, quote_literal_cstr(), res, slot_getattr(), StartTransactionCommand(), TTSOpsMinimalTuple, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and wrconn.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ ValidateSlotSyncParams()

bool ValidateSlotSyncParams ( int  elevel)

Definition at line 1061 of file slotsync.c.

1062 {
1063  /*
1064  * Logical slot sync/creation requires wal_level >= logical.
1065  *
1066  * Sincle altering the wal_level requires a server restart, so error out
1067  * in this case regardless of elevel provided by caller.
1068  */
1070  ereport(ERROR,
1071  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1072  errmsg("slot synchronization requires wal_level >= \"logical\""));
1073 
1074  /*
1075  * A physical replication slot(primary_slot_name) is required on the
1076  * primary to ensure that the rows needed by the standby are not removed
1077  * after restarting, so that the synchronized slot on the standby will not
1078  * be invalidated.
1079  */
1080  if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
1081  {
1082  ereport(elevel,
1083  /* translator: %s is a GUC variable name */
1084  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1085  errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
1086  return false;
1087  }
1088 
1089  /*
1090  * hot_standby_feedback must be enabled to cooperate with the physical
1091  * replication slot, which allows informing the primary about the xmin and
1092  * catalog_xmin values on the standby.
1093  */
1094  if (!hot_standby_feedback)
1095  {
1096  ereport(elevel,
1097  /* translator: %s is a GUC variable name */
1098  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099  errmsg("slot synchronization requires %s to be enabled",
1100  "hot_standby_feedback"));
1101  return false;
1102  }
1103 
1104  /*
1105  * The primary_conninfo is required to make connection to primary for
1106  * getting slots information.
1107  */
1108  if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
1109  {
1110  ereport(elevel,
1111  /* translator: %s is a GUC variable name */
1112  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1113  errmsg("slot synchronization requires %s to be defined",
1114  "primary_conninfo"));
1115  return false;
1116  }
1117 
1118  return true;
1119 }
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74

References ereport, errcode(), errmsg(), ERROR, hot_standby_feedback, PrimaryConnInfo, PrimarySlotName, wal_level, and WAL_LEVEL_LOGICAL.

Referenced by MaybeStartSlotSyncWorker(), and pg_sync_replication_slots().

◆ wait_for_slot_activity()

static void wait_for_slot_activity ( bool  some_slot_updated)
static

Definition at line 1215 of file slotsync.c.

1216 {
1217  int rc;
1218 
1219  if (!some_slot_updated)
1220  {
1221  /*
1222  * No slots were updated, so double the sleep time, but not beyond the
1223  * maximum allowable value.
1224  */
1226  }
1227  else
1228  {
1229  /*
1230  * Some slots were updated since the last sleep, so reset the sleep
1231  * time.
1232  */
1234  }
1235 
1236  rc = WaitLatch(MyLatch,
1238  sleep_ms,
1239  WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1240 
1241  if (rc & WL_LATCH_SET)
1243 }
#define Min(x, y)
Definition: c.h:1004
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:118
static long sleep_ms
Definition: slotsync.c:121
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:119

References MAX_SLOTSYNC_WORKER_NAPTIME_MS, Min, MIN_SLOTSYNC_WORKER_NAPTIME_MS, MyLatch, ResetLatch(), sleep_ms, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ReplSlotSyncWorkerMain().

Variable Documentation

◆ sleep_ms

long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
static

Definition at line 121 of file slotsync.c.

Referenced by do_watch(), and wait_for_slot_activity().

◆ SlotSyncCtx

◆ sync_replication_slots

bool sync_replication_slots = false

Definition at line 111 of file slotsync.c.

Referenced by MaybeStartSlotSyncWorker(), and slotsync_reread_config().

◆ syncing_slots

bool syncing_slots = false
static