PostgreSQL Source Code  git master
slotsync.c File Reference
#include "postgres.h"
#include <time.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "libpq/pqsignal.h"
#include "pgstat.h"
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
Include dependency graph for slotsync.c:

Go to the source code of this file.

Data Structures

struct  SlotSyncCtxStruct
 
struct  RemoteSlot
 

Macros

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200
 
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */
 
#define SLOTSYNC_RESTART_INTERVAL_SEC   10
 
#define SLOTSYNC_COLUMN_COUNT   9
 
#define PRIMARY_INFO_OUTPUT_COL_COUNT   2
 

Typedefs

typedef struct SlotSyncCtxStruct SlotSyncCtxStruct
 
typedef struct RemoteSlot RemoteSlot
 

Functions

static void slotsync_failure_callback (int code, Datum arg)
 
static void update_synced_slots_inactive_since (void)
 
static bool update_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
 
static Listget_local_synced_slots (void)
 
static bool local_sync_slot_required (ReplicationSlot *local_slot, List *remote_slots)
 
static void drop_local_obsolete_slots (List *remote_slot_list)
 
static void reserve_wal_for_local_slot (XLogRecPtr restart_lsn)
 
static bool update_and_persist_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_one_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_slots (WalReceiverConn *wrconn)
 
static void validate_remote_info (WalReceiverConn *wrconn)
 
char * CheckAndGetDbnameFromConninfo (void)
 
bool ValidateSlotSyncParams (int elevel)
 
static void slotsync_reread_config (void)
 
static void ProcessSlotSyncInterrupts (WalReceiverConn *wrconn)
 
static void slotsync_worker_disconnect (int code, Datum arg)
 
static void slotsync_worker_onexit (int code, Datum arg)
 
static void wait_for_slot_activity (bool some_slot_updated)
 
static void check_and_set_sync_info (pid_t worker_pid)
 
static void reset_syncing_flag ()
 
void ReplSlotSyncWorkerMain (char *startup_data, size_t startup_data_len)
 
void ShutDownSlotSync (void)
 
bool SlotSyncWorkerCanRestart (void)
 
bool IsSyncingReplicationSlots (void)
 
Size SlotSyncShmemSize (void)
 
void SlotSyncShmemInit (void)
 
void SyncReplicationSlots (WalReceiverConn *wrconn)
 

Variables

static SlotSyncCtxStructSlotSyncCtx = NULL
 
bool sync_replication_slots = false
 
static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
 
static bool syncing_slots = false
 

Macro Definition Documentation

◆ MAX_SLOTSYNC_WORKER_NAPTIME_MS

#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */

Definition at line 117 of file slotsync.c.

◆ MIN_SLOTSYNC_WORKER_NAPTIME_MS

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200

Definition at line 116 of file slotsync.c.

◆ PRIMARY_INFO_OUTPUT_COL_COUNT

#define PRIMARY_INFO_OUTPUT_COL_COUNT   2

◆ SLOTSYNC_COLUMN_COUNT

#define SLOTSYNC_COLUMN_COUNT   9

◆ SLOTSYNC_RESTART_INTERVAL_SEC

#define SLOTSYNC_RESTART_INTERVAL_SEC   10

Definition at line 122 of file slotsync.c.

Typedef Documentation

◆ RemoteSlot

typedef struct RemoteSlot RemoteSlot

◆ SlotSyncCtxStruct

Function Documentation

◆ check_and_set_sync_info()

static void check_and_set_sync_info ( pid_t  worker_pid)
static

Definition at line 1271 of file slotsync.c.

1272 {
1274 
1275  /* The worker pid must not be already assigned in SlotSyncCtx */
1276  Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
1277 
1278  /*
1279  * Emit an error if startup process signaled the slot sync machinery to
1280  * stop. See comments atop SlotSyncCtxStruct.
1281  */
1283  {
1285  ereport(ERROR,
1286  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1287  errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
1288  }
1289 
1290  if (SlotSyncCtx->syncing)
1291  {
1293  ereport(ERROR,
1294  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1295  errmsg("cannot synchronize replication slots concurrently"));
1296  }
1297 
1298  SlotSyncCtx->syncing = true;
1299 
1300  /*
1301  * Advertise the required PID so that the startup process can kill the
1302  * slot sync worker on promotion.
1303  */
1304  SlotSyncCtx->pid = worker_pid;
1305 
1307 
1308  syncing_slots = true;
1309 }
#define Assert(condition)
Definition: c.h:858
int errcode(int sqlerrcode)
Definition: elog.c:855
int errmsg(const char *fmt,...)
Definition: elog.c:1068
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define InvalidPid
Definition: miscadmin.h:32
static SlotSyncCtxStruct * SlotSyncCtx
Definition: slotsync.c:106
static bool syncing_slots
Definition: slotsync.c:129
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62

References Assert, ereport, errcode(), errmsg(), ERROR, InvalidPid, SlotSyncCtxStruct::mutex, SlotSyncCtxStruct::pid, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::stopSignaled, SlotSyncCtxStruct::syncing, and syncing_slots.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ CheckAndGetDbnameFromConninfo()

char* CheckAndGetDbnameFromConninfo ( void  )

Definition at line 1013 of file slotsync.c.

1014 {
1015  char *dbname;
1016 
1017  /*
1018  * The slot synchronization needs a database connection for walrcv_exec to
1019  * work.
1020  */
1022  if (dbname == NULL)
1023  ereport(ERROR,
1024 
1025  /*
1026  * translator: dbname is a specific option; %s is a GUC variable name
1027  */
1028  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1029  errmsg("slot synchronization requires dbname to be specified in %s",
1030  "primary_conninfo"));
1031  return dbname;
1032 }
char * dbname
Definition: streamutil.c:52
#define walrcv_get_dbname_from_conninfo(conninfo)
Definition: walreceiver.h:442
char * PrimaryConnInfo
Definition: xlogrecovery.c:96

References dbname, ereport, errcode(), errmsg(), ERROR, PrimaryConnInfo, and walrcv_get_dbname_from_conninfo.

Referenced by pg_sync_replication_slots(), and ReplSlotSyncWorkerMain().

◆ drop_local_obsolete_slots()

static void drop_local_obsolete_slots ( List remote_slot_list)
static

Definition at line 417 of file slotsync.c.

418 {
419  List *local_slots = get_local_synced_slots();
420 
421  foreach_ptr(ReplicationSlot, local_slot, local_slots)
422  {
423  /* Drop the local slot if it is not required to be retained. */
424  if (!local_sync_slot_required(local_slot, remote_slot_list))
425  {
426  bool synced_slot;
427 
428  /*
429  * Use shared lock to prevent a conflict with
430  * ReplicationSlotsDropDBSlots(), trying to drop the same slot
431  * during a drop-database operation.
432  */
433  LockSharedObject(DatabaseRelationId, local_slot->data.database,
434  0, AccessShareLock);
435 
436  /*
437  * In the small window between getting the slot to drop and
438  * locking the database, there is a possibility of a parallel
439  * database drop by the startup process and the creation of a new
440  * slot by the user. This new user-created slot may end up using
441  * the same shared memory as that of 'local_slot'. Thus check if
442  * local_slot is still the synced one before performing actual
443  * drop.
444  */
445  SpinLockAcquire(&local_slot->mutex);
446  synced_slot = local_slot->in_use && local_slot->data.synced;
447  SpinLockRelease(&local_slot->mutex);
448 
449  if (synced_slot)
450  {
451  ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
453  }
454 
455  UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
456  0, AccessShareLock);
457 
458  ereport(LOG,
459  errmsg("dropped replication slot \"%s\" of dbid %u",
460  NameStr(local_slot->data.name),
461  local_slot->data.database));
462  }
463  }
464 }
#define NameStr(name)
Definition: c.h:746
#define LOG
Definition: elog.h:31
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1073
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1132
#define AccessShareLock
Definition: lockdefs.h:36
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
void ReplicationSlotDropAcquired(void)
Definition: slot.c:868
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
Definition: slotsync.c:364
static List * get_local_synced_slots(void)
Definition: slotsync.c:333
Definition: pg_list.h:54

References AccessShareLock, ereport, errmsg(), foreach_ptr, get_local_synced_slots(), local_sync_slot_required(), LockSharedObject(), LOG, NameStr, ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), SpinLockAcquire, SpinLockRelease, and UnlockSharedObject().

Referenced by synchronize_slots().

◆ get_local_synced_slots()

static List* get_local_synced_slots ( void  )
static

Definition at line 333 of file slotsync.c.

334 {
335  List *local_slots = NIL;
336 
337  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
338 
339  for (int i = 0; i < max_replication_slots; i++)
340  {
342 
343  /* Check if it is a synchronized slot */
344  if (s->in_use && s->data.synced)
345  {
346  Assert(SlotIsLogical(s));
347  local_slots = lappend(local_slots, s);
348  }
349  }
350 
351  LWLockRelease(ReplicationSlotControlLock);
352 
353  return local_slots;
354 }
int i
Definition: isn.c:73
List * lappend(List *list, void *datum)
Definition: list.c:339
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
#define NIL
Definition: pg_list.h:68
int max_replication_slots
Definition: slot.c:141
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:135
#define SlotIsLogical(slot)
Definition: slot.h:210
ReplicationSlot replication_slots[1]
Definition: slot.h:221
bool in_use
Definition: slot.h:154
ReplicationSlotPersistentData data
Definition: slot.h:178

References Assert, ReplicationSlot::data, i, ReplicationSlot::in_use, lappend(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, NIL, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by drop_local_obsolete_slots().

◆ IsSyncingReplicationSlots()

bool IsSyncingReplicationSlots ( void  )

Definition at line 1651 of file slotsync.c.

1652 {
1653  return syncing_slots;
1654 }

References syncing_slots.

Referenced by CreateDecodingContext(), GetStandbyFlushRecPtr(), and ReplicationSlotCreate().

◆ local_sync_slot_required()

static bool local_sync_slot_required ( ReplicationSlot local_slot,
List remote_slots 
)
static

Definition at line 364 of file slotsync.c.

365 {
366  bool remote_exists = false;
367  bool locally_invalidated = false;
368 
369  foreach_ptr(RemoteSlot, remote_slot, remote_slots)
370  {
371  if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
372  {
373  remote_exists = true;
374 
375  /*
376  * If remote slot is not invalidated but local slot is marked as
377  * invalidated, then set locally_invalidated flag.
378  */
379  SpinLockAcquire(&local_slot->mutex);
380  locally_invalidated =
381  (remote_slot->invalidated == RS_INVAL_NONE) &&
382  (local_slot->data.invalidated != RS_INVAL_NONE);
383  SpinLockRelease(&local_slot->mutex);
384 
385  break;
386  }
387  }
388 
389  return (remote_exists && !locally_invalidated);
390 }
@ RS_INVAL_NONE
Definition: slot.h:49
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:96
slock_t mutex
Definition: slot.h:151

References ReplicationSlot::data, foreach_ptr, ReplicationSlotPersistentData::invalidated, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, RS_INVAL_NONE, SpinLockAcquire, and SpinLockRelease.

Referenced by drop_local_obsolete_slots().

◆ ProcessSlotSyncInterrupts()

static void ProcessSlotSyncInterrupts ( WalReceiverConn wrconn)
static

Definition at line 1155 of file slotsync.c.

1156 {
1158 
1160  {
1161  ereport(LOG,
1162  errmsg("slot sync worker is shutting down on receiving SIGINT"));
1163 
1164  proc_exit(0);
1165  }
1166 
1167  if (ConfigReloadPending)
1169 }
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static void slotsync_reread_config(void)
Definition: slotsync.c:1106

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, proc_exit(), ShutdownRequestPending, and slotsync_reread_config().

Referenced by ReplSlotSyncWorkerMain().

◆ ReplSlotSyncWorkerMain()

void ReplSlotSyncWorkerMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 1331 of file slotsync.c.

1332 {
1333  WalReceiverConn *wrconn = NULL;
1334  char *dbname;
1335  char *err;
1336  sigjmp_buf local_sigjmp_buf;
1337  StringInfoData app_name;
1338 
1339  Assert(startup_data_len == 0);
1340 
1342 
1343  init_ps_display(NULL);
1344 
1346 
1347  /*
1348  * Create a per-backend PGPROC struct in shared memory. We must do this
1349  * before we access any shared memory.
1350  */
1351  InitProcess();
1352 
1353  /*
1354  * Early initialization.
1355  */
1356  BaseInit();
1357 
1358  Assert(SlotSyncCtx != NULL);
1359 
1360  /*
1361  * If an exception is encountered, processing resumes here.
1362  *
1363  * We just need to clean up, report the error, and go away.
1364  *
1365  * If we do not have this handling here, then since this worker process
1366  * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1367  * Therefore, we create our own exception handler to catch ERRORs.
1368  */
1369  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1370  {
1371  /* since not using PG_TRY, must reset error stack by hand */
1372  error_context_stack = NULL;
1373 
1374  /* Prevents interrupts while cleaning up */
1375  HOLD_INTERRUPTS();
1376 
1377  /* Report the error to the server log */
1378  EmitErrorReport();
1379 
1380  /*
1381  * We can now go away. Note that because we called InitProcess, a
1382  * callback was registered to do ProcKill, which will clean up
1383  * necessary state.
1384  */
1385  proc_exit(0);
1386  }
1387 
1388  /* We can now handle ereport(ERROR) */
1389  PG_exception_stack = &local_sigjmp_buf;
1390 
1391  /* Setup signal handling */
1394  pqsignal(SIGTERM, die);
1400 
1402 
1403  ereport(LOG, errmsg("slot sync worker started"));
1404 
1405  /* Register it as soon as SlotSyncCtx->pid is initialized. */
1407 
1408  /*
1409  * Establishes SIGALRM handler and initialize timeout module. It is needed
1410  * by InitPostgres to register different timeouts.
1411  */
1413 
1414  /* Load the libpq-specific functions */
1415  load_file("libpqwalreceiver", false);
1416 
1417  /*
1418  * Unblock signals (they were blocked when the postmaster forked us)
1419  */
1420  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
1421 
1422  /*
1423  * Set always-secure search path, so malicious users can't redirect user
1424  * code (e.g. operators).
1425  *
1426  * It's not strictly necessary since we won't be scanning or writing to
1427  * any user table locally, but it's good to retain it here for added
1428  * precaution.
1429  */
1430  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1431 
1433 
1434  /*
1435  * Connect to the database specified by the user in primary_conninfo. We
1436  * need a database connection for walrcv_exec to work which we use to
1437  * fetch slot information from the remote node. See comments atop
1438  * libpqrcv_exec.
1439  *
1440  * We do not specify a specific user here since the slot sync worker will
1441  * operate as a superuser. This is safe because the slot sync worker does
1442  * not interact with user tables, eliminating the risk of executing
1443  * arbitrary code within triggers.
1444  */
1445  InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
1446 
1448 
1449  initStringInfo(&app_name);
1450  if (cluster_name[0])
1451  appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
1452  else
1453  appendStringInfoString(&app_name, "slotsync worker");
1454 
1455  /*
1456  * Establish the connection to the primary server for slot
1457  * synchronization.
1458  */
1459  wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
1460  app_name.data, &err);
1461  pfree(app_name.data);
1462 
1463  if (!wrconn)
1464  ereport(ERROR,
1465  errcode(ERRCODE_CONNECTION_FAILURE),
1466  errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
1467  app_name.data, err));
1468 
1469  /*
1470  * Register the disconnection callback.
1471  *
1472  * XXX: This can be combined with previous cleanup registration of
1473  * slotsync_worker_onexit() but that will need the connection to be made
1474  * global and we want to avoid introducing global for this purpose.
1475  */
1477 
1478  /*
1479  * Using the specified primary server connection, check that we are not a
1480  * cascading standby and slot configured in 'primary_slot_name' exists on
1481  * the primary server.
1482  */
1484 
1485  /* Main loop to synchronize slots */
1486  for (;;)
1487  {
1488  bool some_slot_updated = false;
1489 
1491 
1492  some_slot_updated = synchronize_slots(wrconn);
1493 
1494  wait_for_slot_activity(some_slot_updated);
1495  }
1496 
1497  /*
1498  * The slot sync worker can't get here because it will only stop when it
1499  * receives a SIGINT from the startup process, or when there is an error.
1500  */
1501  Assert(false);
1502 }
sigset_t UnBlockSig
Definition: pqsignal.c:22
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void EmitErrorReport(void)
Definition: elog.c:1668
ErrorContextCallback * error_context_stack
Definition: elog.c:94
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
void err(int eval, const char *fmt,...)
Definition: err.c:43
int MyProcPid
Definition: globals.c:46
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4285
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
char * cluster_name
Definition: guc_tables.c:541
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void pfree(void *pointer)
Definition: mcxt.c:1521
@ NormalProcessing
Definition: miscadmin.h:449
@ InitProcessing
Definition: miscadmin.h:448
#define GetProcessingMode()
Definition: miscadmin.h:458
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
#define SetProcessingMode(mode)
Definition: miscadmin.h:460
@ B_SLOTSYNC_WORKER
Definition: miscadmin.h:343
BackendType MyBackendType
Definition: miscinit.c:63
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void FloatExceptionHandler(SIGNAL_ARGS)
Definition: postgres.c:3019
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
void BaseInit(void)
Definition: postinit.c:602
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
Definition: postinit.c:693
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void init_ps_display(const char *fixed_part)
Definition: ps_status.c:267
static void slotsync_worker_disconnect(int code, Datum arg)
Definition: slotsync.c:1177
static bool synchronize_slots(WalReceiverConn *wrconn)
Definition: slotsync.c:791
static void wait_for_slot_activity(bool some_slot_updated)
Definition: slotsync.c:1236
char * CheckAndGetDbnameFromConninfo(void)
Definition: slotsync.c:1013
static void slotsync_worker_onexit(int code, Datum arg)
Definition: slotsync.c:1190
static void check_and_set_sync_info(pid_t worker_pid)
Definition: slotsync.c:1271
static void validate_remote_info(WalReceiverConn *wrconn)
Definition: slotsync.c:934
static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
Definition: slotsync.c:1155
void InitProcess(void)
Definition: proc.c:296
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void InitializeTimeouts(void)
Definition: timeout.c:470
static WalReceiverConn * wrconn
Definition: walreceiver.c:92
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References appendStringInfo(), appendStringInfoString(), Assert, B_SLOTSYNC_WORKER, BaseInit(), before_shmem_exit(), check_and_set_sync_info(), CheckAndGetDbnameFromConninfo(), cluster_name, StringInfoData::data, dbname, die, EmitErrorReport(), ereport, err(), errcode(), errmsg(), ERROR, error_context_stack, FloatExceptionHandler(), GetProcessingMode, HOLD_INTERRUPTS, init_ps_display(), InitializeTimeouts(), InitPostgres(), InitProcess(), InitProcessing, initStringInfo(), InvalidOid, load_file(), LOG, MyBackendType, MyProcPid, NormalProcessing, pfree(), PG_exception_stack, PGC_S_OVERRIDE, PGC_SUSET, PointerGetDatum(), pqsignal(), PrimaryConnInfo, proc_exit(), ProcessSlotSyncInterrupts(), procsignal_sigusr1_handler(), SetConfigOption(), SetProcessingMode, SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, slotsync_worker_disconnect(), slotsync_worker_onexit(), SlotSyncCtx, synchronize_slots(), UnBlockSig, validate_remote_info(), wait_for_slot_activity(), walrcv_connect, and wrconn.

◆ reserve_wal_for_local_slot()

static void reserve_wal_for_local_slot ( XLogRecPtr  restart_lsn)
static

Definition at line 474 of file slotsync.c.

475 {
476  XLogSegNo oldest_segno;
477  XLogSegNo segno;
479 
480  Assert(slot != NULL);
482 
483  while (true)
484  {
485  SpinLockAcquire(&slot->mutex);
486  slot->data.restart_lsn = restart_lsn;
487  SpinLockRelease(&slot->mutex);
488 
489  /* Prevent WAL removal as fast as possible */
491 
493 
494  /*
495  * Find the oldest existing WAL segment file.
496  *
497  * Normally, we can determine it by using the last removed segment
498  * number. However, if no WAL segment files have been removed by a
499  * checkpoint since startup, we need to search for the oldest segment
500  * file from the current timeline existing in XLOGDIR.
501  *
502  * XXX: Currently, we are searching for the oldest segment in the
503  * current timeline as there is less chance of the slot's restart_lsn
504  * from being some prior timeline, and even if it happens, in the
505  * worst case, we will wait to sync till the slot's restart_lsn moved
506  * to the current timeline.
507  */
508  oldest_segno = XLogGetLastRemovedSegno() + 1;
509 
510  if (oldest_segno == 1)
511  {
512  TimeLineID cur_timeline;
513 
514  GetWalRcvFlushRecPtr(NULL, &cur_timeline);
515  oldest_segno = XLogGetOldestSegno(cur_timeline);
516  }
517 
518  elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
519  segno, oldest_segno);
520 
521  /*
522  * If all required WAL is still there, great, otherwise retry. The
523  * slot should prevent further removal of WAL, unless there's a
524  * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
525  * the new restart_lsn above, so normally we should never need to loop
526  * more than twice.
527  */
528  if (segno >= oldest_segno)
529  break;
530 
531  /* Retry using the location of the oldest wal segment */
532  XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
533  }
534 }
#define UINT64_FORMAT
Definition: c.h:549
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1105
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3747
int wal_segment_size
Definition: xlog.c:143
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3763
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReplicationSlot::data, DEBUG1, elog, GetWalRcvFlushRecPtr(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, UINT64_FORMAT, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogGetOldestSegno(), XLogRecPtrIsInvalid, and XLogSegNoOffsetToRecPtr.

Referenced by synchronize_one_slot().

◆ reset_syncing_flag()

static void reset_syncing_flag ( )
static

◆ ShutDownSlotSync()

void ShutDownSlotSync ( void  )

Definition at line 1563 of file slotsync.c.

1564 {
1565  pid_t worker_pid;
1566 
1568 
1569  SlotSyncCtx->stopSignaled = true;
1570 
1571  /*
1572  * Return if neither the slot sync worker is running nor the function
1573  * pg_sync_replication_slots() is executing.
1574  */
1575  if (!SlotSyncCtx->syncing)
1576  {
1579  return;
1580  }
1581 
1582  worker_pid = SlotSyncCtx->pid;
1583 
1585 
1586  if (worker_pid != InvalidPid)
1587  kill(worker_pid, SIGINT);
1588 
1589  /* Wait for slot sync to end */
1590  for (;;)
1591  {
1592  int rc;
1593 
1594  /* Wait a bit, we don't expect to have to wait long */
1595  rc = WaitLatch(MyLatch,
1597  10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1598 
1599  if (rc & WL_LATCH_SET)
1600  {
1603  }
1604 
1606 
1607  /* Ensure that no process is syncing the slots. */
1608  if (!SlotSyncCtx->syncing)
1609  break;
1610 
1612  }
1613 
1615 
1617 }
struct Latch * MyLatch
Definition: globals.c:61
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static void update_synced_slots_inactive_since(void)
Definition: slotsync.c:1511
#define kill(pid, sig)
Definition: win32_port.h:485

References CHECK_FOR_INTERRUPTS, InvalidPid, kill, SlotSyncCtxStruct::mutex, MyLatch, SlotSyncCtxStruct::pid, ResetLatch(), SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::stopSignaled, SlotSyncCtxStruct::syncing, update_synced_slots_inactive_since(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by FinishWalRecovery().

◆ slotsync_failure_callback()

static void slotsync_failure_callback ( int  code,
Datum  arg 
)
static

Definition at line 1689 of file slotsync.c.

1690 {
1692 
1693  /*
1694  * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1695  *
1696  * The startup process during promotion invokes ShutDownSlotSync() which
1697  * waits for slot sync to finish and it does that by checking the
1698  * 'syncing' flag. Thus the SQL function must be done with slots' release
1699  * and cleanup to avoid any dangling temporary slots or active slots
1700  * before it marks itself as finished syncing.
1701  */
1702 
1703  /* Make sure active replication slots are released */
1704  if (MyReplicationSlot != NULL)
1706 
1707  /* Also cleanup the synced temporary slots. */
1708  ReplicationSlotCleanup(true);
1709 
1710  /*
1711  * The set syncing_slots indicates that the process errored out without
1712  * resetting the flag. So, we need to clean up shared memory and reset the
1713  * flag here.
1714  */
1715  if (syncing_slots)
1717 
1719 }
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
static void reset_syncing_flag()
Definition: slotsync.c:1315
#define walrcv_disconnect(conn)
Definition: walreceiver.h:464

References arg, DatumGetPointer(), MyReplicationSlot, ReplicationSlotCleanup(), ReplicationSlotRelease(), reset_syncing_flag(), syncing_slots, walrcv_disconnect, and wrconn.

Referenced by SyncReplicationSlots().

◆ slotsync_reread_config()

static void slotsync_reread_config ( void  )
static

Definition at line 1106 of file slotsync.c.

1107 {
1108  char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
1109  char *old_primary_slotname = pstrdup(PrimarySlotName);
1110  bool old_sync_replication_slots = sync_replication_slots;
1111  bool old_hot_standby_feedback = hot_standby_feedback;
1112  bool conninfo_changed;
1113  bool primary_slotname_changed;
1114 
1116 
1117  ConfigReloadPending = false;
1119 
1120  conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
1121  primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
1122  pfree(old_primary_conninfo);
1123  pfree(old_primary_slotname);
1124 
1125  if (old_sync_replication_slots != sync_replication_slots)
1126  {
1127  ereport(LOG,
1128  /* translator: %s is a GUC variable name */
1129  errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
1130  proc_exit(0);
1131  }
1132 
1133  if (conninfo_changed ||
1134  primary_slotname_changed ||
1135  (old_hot_standby_feedback != hot_standby_feedback))
1136  {
1137  ereport(LOG,
1138  errmsg("slot sync worker will restart because of a parameter change"));
1139 
1140  /*
1141  * Reset the last-start time for this worker so that the postmaster
1142  * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
1143  */
1145 
1146  proc_exit(0);
1147  }
1148 
1149 }
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
char * pstrdup(const char *in)
Definition: mcxt.c:1696
bool sync_replication_slots
Definition: slotsync.c:109
time_t last_start_time
Definition: slotsync.c:102
bool hot_standby_feedback
Definition: walreceiver.c:89
char * PrimarySlotName
Definition: xlogrecovery.c:97

References Assert, ConfigReloadPending, ereport, errmsg(), hot_standby_feedback, SlotSyncCtxStruct::last_start_time, LOG, pfree(), PGC_SIGHUP, PrimaryConnInfo, PrimarySlotName, proc_exit(), ProcessConfigFile(), pstrdup(), SlotSyncCtx, and sync_replication_slots.

Referenced by ProcessSlotSyncInterrupts().

◆ slotsync_worker_disconnect()

static void slotsync_worker_disconnect ( int  code,
Datum  arg 
)
static

Definition at line 1177 of file slotsync.c.

1178 {
1180 
1182 }

References arg, DatumGetPointer(), walrcv_disconnect, and wrconn.

Referenced by ReplSlotSyncWorkerMain().

◆ slotsync_worker_onexit()

static void slotsync_worker_onexit ( int  code,
Datum  arg 
)
static

Definition at line 1190 of file slotsync.c.

1191 {
1192  /*
1193  * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1194  *
1195  * The startup process during promotion invokes ShutDownSlotSync() which
1196  * waits for slot sync to finish and it does that by checking the
1197  * 'syncing' flag. Thus the slot sync worker must be done with slots'
1198  * release and cleanup to avoid any dangling temporary slots or active
1199  * slots before it marks itself as finished syncing.
1200  */
1201 
1202  /* Make sure active replication slots are released */
1203  if (MyReplicationSlot != NULL)
1205 
1206  /* Also cleanup the temporary slots. */
1207  ReplicationSlotCleanup(false);
1208 
1210 
1212 
1213  /*
1214  * If syncing_slots is true, it indicates that the process errored out
1215  * without resetting the flag. So, we need to clean up shared memory and
1216  * reset the flag here.
1217  */
1218  if (syncing_slots)
1219  {
1220  SlotSyncCtx->syncing = false;
1221  syncing_slots = false;
1222  }
1223 
1225 }

References InvalidPid, SlotSyncCtxStruct::mutex, MyReplicationSlot, SlotSyncCtxStruct::pid, ReplicationSlotCleanup(), ReplicationSlotRelease(), SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::syncing, and syncing_slots.

Referenced by ReplSlotSyncWorkerMain().

◆ SlotSyncShmemInit()

void SlotSyncShmemInit ( void  )

Definition at line 1669 of file slotsync.c.

1670 {
1672  bool found;
1673 
1675  ShmemInitStruct("Slot Sync Data", size, &found);
1676 
1677  if (!found)
1678  {
1679  memset(SlotSyncCtx, 0, size);
1682  }
1683 }
size_t Size
Definition: c.h:605
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
static pg_noinline void Size size
Definition: slab.c:607
Size SlotSyncShmemSize(void)
Definition: slotsync.c:1660
#define SpinLockInit(lock)
Definition: spin.h:60

References InvalidPid, SlotSyncCtxStruct::mutex, SlotSyncCtxStruct::pid, ShmemInitStruct(), size, SlotSyncCtx, SlotSyncShmemSize(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SlotSyncShmemSize()

Size SlotSyncShmemSize ( void  )

Definition at line 1660 of file slotsync.c.

1661 {
1662  return sizeof(SlotSyncCtxStruct);
1663 }
struct SlotSyncCtxStruct SlotSyncCtxStruct

Referenced by CalculateShmemSize(), and SlotSyncShmemInit().

◆ SlotSyncWorkerCanRestart()

bool SlotSyncWorkerCanRestart ( void  )

Definition at line 1631 of file slotsync.c.

1632 {
1633  time_t curtime = time(NULL);
1634 
1635  /* Return false if too soon since last start. */
1636  if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
1637  (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
1638  return false;
1639 
1640  SlotSyncCtx->last_start_time = curtime;
1641 
1642  return true;
1643 }
#define SLOTSYNC_RESTART_INTERVAL_SEC
Definition: slotsync.c:122

References SlotSyncCtxStruct::last_start_time, SLOTSYNC_RESTART_INTERVAL_SEC, and SlotSyncCtx.

Referenced by MaybeStartSlotSyncWorker().

◆ synchronize_one_slot()

static bool synchronize_one_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 609 of file slotsync.c.

610 {
611  ReplicationSlot *slot;
612  XLogRecPtr latestFlushPtr;
613  bool slot_updated = false;
614 
615  /*
616  * Make sure that concerned WAL is received and flushed before syncing
617  * slot to target lsn received from the primary server.
618  */
619  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
620  if (remote_slot->confirmed_lsn > latestFlushPtr)
621  {
622  /*
623  * Can get here only if GUC 'synchronized_standby_slots' on the
624  * primary server was not configured correctly.
625  */
627  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
628  errmsg("skipping slot synchronization as the received slot sync"
629  " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
630  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
631  remote_slot->name,
632  LSN_FORMAT_ARGS(latestFlushPtr)));
633 
634  return false;
635  }
636 
637  /* Search for the named slot */
638  if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
639  {
640  bool synced;
641 
642  SpinLockAcquire(&slot->mutex);
643  synced = slot->data.synced;
644  SpinLockRelease(&slot->mutex);
645 
646  /* User-created slot with the same name exists, raise ERROR. */
647  if (!synced)
648  ereport(ERROR,
649  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
650  errmsg("exiting from slot synchronization because same"
651  " name slot \"%s\" already exists on the standby",
652  remote_slot->name));
653 
654  /*
655  * The slot has been synchronized before.
656  *
657  * It is important to acquire the slot here before checking
658  * invalidation. If we don't acquire the slot first, there could be a
659  * race condition that the local slot could be invalidated just after
660  * checking the 'invalidated' flag here and we could end up
661  * overwriting 'invalidated' flag to remote_slot's value. See
662  * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
663  * if the slot is not acquired by other processes.
664  *
665  * XXX: If it ever turns out that slot acquire/release is costly for
666  * cases when none of the slot properties is changed then we can do a
667  * pre-check to ensure that at least one of the slot properties is
668  * changed before acquiring the slot.
669  */
670  ReplicationSlotAcquire(remote_slot->name, true);
671 
672  Assert(slot == MyReplicationSlot);
673 
674  /*
675  * Copy the invalidation cause from remote only if local slot is not
676  * invalidated locally, we don't want to overwrite existing one.
677  */
678  if (slot->data.invalidated == RS_INVAL_NONE &&
679  remote_slot->invalidated != RS_INVAL_NONE)
680  {
681  SpinLockAcquire(&slot->mutex);
682  slot->data.invalidated = remote_slot->invalidated;
683  SpinLockRelease(&slot->mutex);
684 
685  /* Make sure the invalidated state persists across server restart */
688 
689  slot_updated = true;
690  }
691 
692  /* Skip the sync of an invalidated slot */
693  if (slot->data.invalidated != RS_INVAL_NONE)
694  {
696  return slot_updated;
697  }
698 
699  /* Slot not ready yet, let's attempt to make it sync-ready now. */
700  if (slot->data.persistency == RS_TEMPORARY)
701  {
702  slot_updated = update_and_persist_local_synced_slot(remote_slot,
703  remote_dbid);
704  }
705 
706  /* Slot ready for sync, so sync it. */
707  else
708  {
709  /*
710  * Sanity check: As long as the invalidations are handled
711  * appropriately as above, this should never happen.
712  *
713  * We don't need to check restart_lsn here. See the comments in
714  * update_local_synced_slot() for details.
715  */
716  if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
717  ereport(ERROR,
718  errmsg_internal("cannot synchronize local slot \"%s\"",
719  remote_slot->name),
720  errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
722  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
723 
724  slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
725  NULL, NULL);
726  }
727  }
728  /* Otherwise create the slot first. */
729  else
730  {
731  NameData plugin_name;
732  TransactionId xmin_horizon = InvalidTransactionId;
733 
734  /* Skip creating the local slot if remote_slot is invalidated already */
735  if (remote_slot->invalidated != RS_INVAL_NONE)
736  return false;
737 
738  /*
739  * We create temporary slots instead of ephemeral slots here because
740  * we want the slots to survive after releasing them. This is done to
741  * avoid dropping and re-creating the slots in each synchronization
742  * cycle if the restart_lsn or catalog_xmin of the remote slot has not
743  * caught up.
744  */
745  ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
746  remote_slot->two_phase,
747  remote_slot->failover,
748  true);
749 
750  /* For shorter lines. */
751  slot = MyReplicationSlot;
752 
753  /* Avoid expensive operations while holding a spinlock. */
754  namestrcpy(&plugin_name, remote_slot->plugin);
755 
756  SpinLockAcquire(&slot->mutex);
757  slot->data.database = remote_dbid;
758  slot->data.plugin = plugin_name;
759  SpinLockRelease(&slot->mutex);
760 
762 
763  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
764  xmin_horizon = GetOldestSafeDecodingTransactionId(true);
765  SpinLockAcquire(&slot->mutex);
766  slot->effective_catalog_xmin = xmin_horizon;
767  slot->data.catalog_xmin = xmin_horizon;
768  SpinLockRelease(&slot->mutex);
770  LWLockRelease(ProcArrayLock);
771 
772  update_and_persist_local_synced_slot(remote_slot, remote_dbid);
773 
774  slot_updated = true;
775  }
776 
778 
779  return slot_updated;
780 }
uint32 TransactionId
Definition: c.h:652
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1155
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1228
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define AmLogicalSlotSyncWorkerProcess()
Definition: miscadmin.h:378
void namestrcpy(Name name, const char *str)
Definition: name.c:233
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2949
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1010
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1049
void ReplicationSlotSave(void)
Definition: slot.c:992
@ RS_TEMPORARY
Definition: slot.h:37
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
Definition: slotsync.c:474
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:545
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
Definition: slotsync.c:168
bool two_phase
Definition: slotsync.c:140
char * plugin
Definition: slotsync.c:138
char * name
Definition: slotsync.c:137
bool failover
Definition: slotsync.c:141
ReplicationSlotInvalidationCause invalidated
Definition: slotsync.c:147
XLogRecPtr confirmed_lsn
Definition: slotsync.c:143
XLogRecPtr restart_lsn
Definition: slotsync.c:142
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistency persistency
Definition: slot.h:74
TransactionId effective_catalog_xmin
Definition: slot.h:175
Definition: c.h:741
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3503
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AmLogicalSlotSyncWorkerProcess, Assert, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ereport, errcode(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, GetOldestSafeDecodingTransactionId(), GetStandbyFlushRecPtr(), RemoteSlot::invalidated, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LOG, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotAcquire(), ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredXmin(), reserve_wal_for_local_slot(), RemoteSlot::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SearchNamedReplicationSlot(), SpinLockAcquire, SpinLockRelease, ReplicationSlotPersistentData::synced, RemoteSlot::two_phase, update_and_persist_local_synced_slot(), and update_local_synced_slot().

Referenced by synchronize_slots().

◆ synchronize_slots()

static bool synchronize_slots ( WalReceiverConn wrconn)
static

Definition at line 791 of file slotsync.c.

792 {
793 #define SLOTSYNC_COLUMN_COUNT 9
794  Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
795  LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
796 
798  TupleTableSlot *tupslot;
799  List *remote_slot_list = NIL;
800  bool some_slot_updated = false;
801  bool started_tx = false;
802  const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
803  " restart_lsn, catalog_xmin, two_phase, failover,"
804  " database, invalidation_reason"
805  " FROM pg_catalog.pg_replication_slots"
806  " WHERE failover and NOT temporary";
807 
808  /* The syscache access in walrcv_exec() needs a transaction env. */
809  if (!IsTransactionState())
810  {
812  started_tx = true;
813  }
814 
815  /* Execute the query */
816  res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
817  if (res->status != WALRCV_OK_TUPLES)
818  ereport(ERROR,
819  errmsg("could not fetch failover logical slots info from the primary server: %s",
820  res->err));
821 
822  /* Construct the remote_slot tuple and synchronize each slot locally */
823  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
824  while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
825  {
826  bool isnull;
827  RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
828  Datum d;
829  int col = 0;
830 
831  remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
832  &isnull));
833  Assert(!isnull);
834 
835  remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
836  &isnull));
837  Assert(!isnull);
838 
839  /*
840  * It is possible to get null values for LSN and Xmin if slot is
841  * invalidated on the primary server, so handle accordingly.
842  */
843  d = slot_getattr(tupslot, ++col, &isnull);
844  remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
845  DatumGetLSN(d);
846 
847  d = slot_getattr(tupslot, ++col, &isnull);
848  remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
849 
850  d = slot_getattr(tupslot, ++col, &isnull);
851  remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
853 
854  remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
855  &isnull));
856  Assert(!isnull);
857 
858  remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
859  &isnull));
860  Assert(!isnull);
861 
862  remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
863  ++col, &isnull));
864  Assert(!isnull);
865 
866  d = slot_getattr(tupslot, ++col, &isnull);
867  remote_slot->invalidated = isnull ? RS_INVAL_NONE :
869 
870  /* Sanity check */
872 
873  /*
874  * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
875  * slot is valid, that means we have fetched the remote_slot in its
876  * RS_EPHEMERAL state. In such a case, don't sync it; we can always
877  * sync it in the next sync cycle when the remote_slot is persisted
878  * and has valid lsn(s) and xmin values.
879  *
880  * XXX: In future, if we plan to expose 'slot->data.persistency' in
881  * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
882  * slots in the first place.
883  */
884  if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
885  XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
886  !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
887  remote_slot->invalidated == RS_INVAL_NONE)
888  pfree(remote_slot);
889  else
890  /* Create list of remote slots */
891  remote_slot_list = lappend(remote_slot_list, remote_slot);
892 
893  ExecClearTuple(tupslot);
894  }
895 
896  /* Drop local slots that no longer need to be synced. */
897  drop_local_obsolete_slots(remote_slot_list);
898 
899  /* Now sync the slots locally */
900  foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
901  {
902  Oid remote_dbid = get_database_oid(remote_slot->database, false);
903 
904  /*
905  * Use shared lock to prevent a conflict with
906  * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
907  * a drop-database operation.
908  */
909  LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
910 
911  some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
912 
913  UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
914  }
915 
916  /* We are done, free remote_slot_list elements */
917  list_free_deep(remote_slot_list);
918 
920 
921  if (started_tx)
923 
924  return some_slot_updated;
925 }
#define TextDatumGetCString(d)
Definition: builtins.h:98
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3119
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
void list_free_deep(List *list)
Definition: list.c:1560
void * palloc0(Size size)
Definition: mcxt.c:1347
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:31
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason)
Definition: slot.c:2392
static void drop_local_obsolete_slots(List *remote_slot_list)
Definition: slotsync.c:417
#define SLOTSYNC_COLUMN_COUNT
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:609
char * database
Definition: slotsync.c:139
TransactionId catalog_xmin
Definition: slotsync.c:144
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1120
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:468
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:462
bool IsTransactionState(void)
Definition: xact.c:385
void StartTransactionCommand(void)
Definition: xact.c:3033
void CommitTransactionCommand(void)
Definition: xact.c:3131
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, Assert, RemoteSlot::catalog_xmin, CommitTransactionCommand(), RemoteSlot::confirmed_lsn, RemoteSlot::database, DatumGetBool(), DatumGetLSN(), DatumGetTransactionId(), drop_local_obsolete_slots(), ereport, errmsg(), ERROR, ExecClearTuple(), RemoteSlot::failover, foreach_ptr, get_database_oid(), GetSlotInvalidationCause(), RemoteSlot::invalidated, InvalidTransactionId, InvalidXLogRecPtr, IsTransactionState(), lappend(), list_free_deep(), LockSharedObject(), MakeSingleTupleTableSlot(), RemoteSlot::name, NIL, palloc0(), pfree(), RemoteSlot::plugin, res, RemoteSlot::restart_lsn, RS_INVAL_NONE, slot_getattr(), SLOTSYNC_COLUMN_COUNT, StartTransactionCommand(), synchronize_one_slot(), TextDatumGetCString, TransactionIdIsValid, TTSOpsMinimalTuple, tuplestore_gettupleslot(), RemoteSlot::two_phase, UnlockSharedObject(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, wrconn, and XLogRecPtrIsInvalid.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ SyncReplicationSlots()

void SyncReplicationSlots ( WalReceiverConn wrconn)

Definition at line 1726 of file slotsync.c.

1727 {
1729  {
1731 
1733 
1735 
1736  /* Cleanup the synced temporary slots */
1737  ReplicationSlotCleanup(true);
1738 
1739  /* We are done with sync, so reset sync flag */
1741  }
1743 }
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
static void slotsync_failure_callback(int code, Datum arg)
Definition: slotsync.c:1689

References check_and_set_sync_info(), InvalidPid, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, PointerGetDatum(), ReplicationSlotCleanup(), reset_syncing_flag(), slotsync_failure_callback(), synchronize_slots(), validate_remote_info(), and wrconn.

Referenced by pg_sync_replication_slots().

◆ update_and_persist_local_synced_slot()

static bool update_and_persist_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 545 of file slotsync.c.

546 {
548  bool found_consistent_snapshot = false;
549  bool remote_slot_precedes = false;
550 
551  (void) update_local_synced_slot(remote_slot, remote_dbid,
552  &found_consistent_snapshot,
553  &remote_slot_precedes);
554 
555  /*
556  * Check if the primary server has caught up. Refer to the comment atop
557  * the file for details on this check.
558  */
559  if (remote_slot_precedes)
560  {
561  /*
562  * The remote slot didn't catch up to locally reserved position.
563  *
564  * We do not drop the slot because the restart_lsn can be ahead of the
565  * current location when recreating the slot in the next cycle. It may
566  * take more time to create such a slot. Therefore, we keep this slot
567  * and attempt the synchronization in the next cycle.
568  */
569  return false;
570  }
571 
572  /*
573  * Don't persist the slot if it cannot reach the consistent point from the
574  * restart_lsn. See comments atop this file.
575  */
576  if (!found_consistent_snapshot)
577  {
578  ereport(LOG,
579  errmsg("could not sync slot \"%s\"", remote_slot->name),
580  errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
582 
583  return false;
584  }
585 
587 
588  ereport(LOG,
589  errmsg("newly created slot \"%s\" is sync-ready now",
590  remote_slot->name));
591 
592  return true;
593 }
int errdetail(const char *fmt,...)
Definition: elog.c:1201
void ReplicationSlotPersist(void)
Definition: slot.c:1027

References ReplicationSlot::data, ereport, errdetail(), errmsg(), LOG, LSN_FORMAT_ARGS, MyReplicationSlot, RemoteSlot::name, ReplicationSlotPersist(), ReplicationSlotPersistentData::restart_lsn, and update_local_synced_slot().

Referenced by synchronize_one_slot().

◆ update_local_synced_slot()

static bool update_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid,
bool found_consistent_snapshot,
bool remote_slot_precedes 
)
static

Definition at line 168 of file slotsync.c.

171 {
173  bool updated_xmin_or_lsn = false;
174  bool updated_config = false;
175 
177 
178  if (found_consistent_snapshot)
179  *found_consistent_snapshot = false;
180 
181  if (remote_slot_precedes)
182  *remote_slot_precedes = false;
183 
184  /*
185  * Don't overwrite if we already have a newer catalog_xmin and
186  * restart_lsn.
187  */
188  if (remote_slot->restart_lsn < slot->data.restart_lsn ||
189  TransactionIdPrecedes(remote_slot->catalog_xmin,
190  slot->data.catalog_xmin))
191  {
192  /*
193  * This can happen in following situations:
194  *
195  * If the slot is temporary, it means either the initial WAL location
196  * reserved for the local slot is ahead of the remote slot's
197  * restart_lsn or the initial xmin_horizon computed for the local slot
198  * is ahead of the remote slot.
199  *
200  * If the slot is persistent, restart_lsn of the synced slot could
201  * still be ahead of the remote slot. Since we use slot advance
202  * functionality to keep snapbuild/slot updated, it is possible that
203  * the restart_lsn is advanced to a later position than it has on the
204  * primary. This can happen when slot advancing machinery finds
205  * running xacts record after reaching the consistent state at a later
206  * point than the primary where it serializes the snapshot and updates
207  * the restart_lsn.
208  *
209  * We LOG the message if the slot is temporary as it can help the user
210  * to understand why the slot is not sync-ready. In the case of a
211  * persistent slot, it would be a more common case and won't directly
212  * impact the users, so we used DEBUG1 level to log the message.
213  */
215  errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
216  remote_slot->name),
217  errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
218  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
219  remote_slot->catalog_xmin,
221  slot->data.catalog_xmin));
222 
223  if (remote_slot_precedes)
224  *remote_slot_precedes = true;
225  }
226 
227  /*
228  * Attempt to sync LSNs and xmins only if remote slot is ahead of local
229  * slot.
230  */
231  else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
232  remote_slot->restart_lsn > slot->data.restart_lsn ||
233  TransactionIdFollows(remote_slot->catalog_xmin,
234  slot->data.catalog_xmin))
235  {
236  /*
237  * We can't directly copy the remote slot's LSN or xmin unless there
238  * exists a consistent snapshot at that point. Otherwise, after
239  * promotion, the slots may not reach a consistent point before the
240  * confirmed_flush_lsn which can lead to a data loss. To avoid data
241  * loss, we let slot machinery advance the slot which ensures that
242  * snapbuilder/slot statuses are updated properly.
243  */
244  if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
245  {
246  /*
247  * Update the slot info directly if there is a serialized snapshot
248  * at the restart_lsn, as the slot can quickly reach consistency
249  * at restart_lsn by restoring the snapshot.
250  */
251  SpinLockAcquire(&slot->mutex);
252  slot->data.restart_lsn = remote_slot->restart_lsn;
253  slot->data.confirmed_flush = remote_slot->confirmed_lsn;
254  slot->data.catalog_xmin = remote_slot->catalog_xmin;
255  SpinLockRelease(&slot->mutex);
256 
257  if (found_consistent_snapshot)
258  *found_consistent_snapshot = true;
259  }
260  else
261  {
263  found_consistent_snapshot);
264 
265  /* Sanity check */
266  if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
267  ereport(ERROR,
268  errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
269  remote_slot->name),
270  errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
271  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
273  }
274 
275  updated_xmin_or_lsn = true;
276  }
277 
278  if (remote_dbid != slot->data.database ||
279  remote_slot->two_phase != slot->data.two_phase ||
280  remote_slot->failover != slot->data.failover ||
281  strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
282  {
283  NameData plugin_name;
284 
285  /* Avoid expensive operations while holding a spinlock. */
286  namestrcpy(&plugin_name, remote_slot->plugin);
287 
288  SpinLockAcquire(&slot->mutex);
289  slot->data.plugin = plugin_name;
290  slot->data.database = remote_dbid;
291  slot->data.two_phase = remote_slot->two_phase;
292  slot->data.failover = remote_slot->failover;
293  SpinLockRelease(&slot->mutex);
294 
295  updated_config = true;
296  }
297 
298  /*
299  * We have to write the changed xmin to disk *before* we change the
300  * in-memory value, otherwise after a crash we wouldn't know that some
301  * catalog tuples might have been removed already.
302  */
303  if (updated_config || updated_xmin_or_lsn)
304  {
307  }
308 
309  /*
310  * Now the new xmin is safely on disk, we can let the global value
311  * advance. We do not take ProcArrayLock or similar since we only advance
312  * xmin here and there's not much harm done by a concurrent computation
313  * missing that.
314  */
315  if (updated_xmin_or_lsn)
316  {
317  SpinLockAcquire(&slot->mutex);
318  slot->effective_catalog_xmin = remote_slot->catalog_xmin;
319  SpinLockRelease(&slot->mutex);
320 
323  }
324 
325  return updated_config || updated_xmin_or_lsn;
326 }
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
Definition: logical.c:2061
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
Definition: snapbuild.c:2159
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.c:314

References Assert, RemoteSlot::catalog_xmin, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ereport, errdetail(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, ReplicationSlotPersistentData::failover, ReplicationSlotPersistentData::invalidated, LOG, LogicalSlotAdvanceAndCheckSnapState(), LSN_FORMAT_ARGS, ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotMarkDirty(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RemoteSlot::restart_lsn, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SnapBuildSnapshotExists(), SpinLockAcquire, SpinLockRelease, TransactionIdFollows(), TransactionIdPrecedes(), RemoteSlot::two_phase, and ReplicationSlotPersistentData::two_phase.

Referenced by synchronize_one_slot(), and update_and_persist_local_synced_slot().

◆ update_synced_slots_inactive_since()

static void update_synced_slots_inactive_since ( void  )
static

Definition at line 1511 of file slotsync.c.

1512 {
1513  TimestampTz now = 0;
1514 
1515  /*
1516  * We need to update inactive_since only when we are promoting standby to
1517  * correctly interpret the inactive_since if the standby gets promoted
1518  * without a restart. We don't want the slots to appear inactive for a
1519  * long time after promotion if they haven't been synchronized recently.
1520  * Whoever acquires the slot i.e.makes the slot active will reset it.
1521  */
1522  if (!StandbyMode)
1523  return;
1524 
1525  /* The slot sync worker or SQL function mustn't be running by now */
1527 
1528  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1529 
1530  for (int i = 0; i < max_replication_slots; i++)
1531  {
1533 
1534  /* Check if it is a synchronized slot */
1535  if (s->in_use && s->data.synced)
1536  {
1537  Assert(SlotIsLogical(s));
1538 
1539  /* The slot must not be acquired by any process */
1540  Assert(s->active_pid == 0);
1541 
1542  /* Use the same inactive_since time for all the slots. */
1543  if (now == 0)
1545 
1546  SpinLockAcquire(&s->mutex);
1547  s->inactive_since = now;
1548  SpinLockRelease(&s->mutex);
1549  }
1550  }
1551 
1552  LWLockRelease(ReplicationSlotControlLock);
1553 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
int64 TimestampTz
Definition: timestamp.h:39
pid_t active_pid
Definition: slot.h:157
TimestampTz inactive_since
Definition: slot.h:206
bool StandbyMode
Definition: xlogrecovery.c:147

References ReplicationSlot::active_pid, Assert, ReplicationSlot::data, GetCurrentTimestamp(), i, ReplicationSlot::in_use, ReplicationSlot::inactive_since, InvalidPid, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, ReplicationSlot::mutex, now(), SlotSyncCtxStruct::pid, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, StandbyMode, ReplicationSlotPersistentData::synced, and SlotSyncCtxStruct::syncing.

Referenced by ShutDownSlotSync().

◆ validate_remote_info()

static void validate_remote_info ( WalReceiverConn wrconn)
static

Definition at line 934 of file slotsync.c.

935 {
936 #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
938  Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
939  StringInfoData cmd;
940  bool isnull;
941  TupleTableSlot *tupslot;
942  bool remote_in_recovery;
943  bool primary_slot_valid;
944  bool started_tx = false;
945 
946  initStringInfo(&cmd);
947  appendStringInfo(&cmd,
948  "SELECT pg_is_in_recovery(), count(*) = 1"
949  " FROM pg_catalog.pg_replication_slots"
950  " WHERE slot_type='physical' AND slot_name=%s",
952 
953  /* The syscache access in walrcv_exec() needs a transaction env. */
954  if (!IsTransactionState())
955  {
957  started_tx = true;
958  }
959 
961  pfree(cmd.data);
962 
963  if (res->status != WALRCV_OK_TUPLES)
964  ereport(ERROR,
965  errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
966  PrimarySlotName, res->err),
967  errhint("Check if primary_slot_name is configured correctly."));
968 
969  tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
970  if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
971  elog(ERROR,
972  "failed to fetch tuple for the primary server slot specified by primary_slot_name");
973 
974  remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
975  Assert(!isnull);
976 
977  /*
978  * Slot sync is currently not supported on a cascading standby. This is
979  * because if we allow it, the primary server needs to wait for all the
980  * cascading standbys, otherwise, logical subscribers can still be ahead
981  * of one of the cascading standbys which we plan to promote. Thus, to
982  * avoid this additional complexity, we restrict it for the time being.
983  */
984  if (remote_in_recovery)
985  ereport(ERROR,
986  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
987  errmsg("cannot synchronize replication slots from a standby server"));
988 
989  primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
990  Assert(!isnull);
991 
992  if (!primary_slot_valid)
993  ereport(ERROR,
994  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
995  errmsg("slot synchronization requires valid primary_slot_name"),
996  /* translator: second %s is a GUC variable name */
997  errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.",
998  PrimarySlotName, "primary_slot_name"));
999 
1000  ExecClearTuple(tupslot);
1002 
1003  if (started_tx)
1005 }
int errhint(const char *fmt,...)
Definition: elog.c:1315
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define PRIMARY_INFO_OUTPUT_COL_COUNT

References appendStringInfo(), Assert, CommitTransactionCommand(), StringInfoData::data, DatumGetBool(), elog, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, ExecClearTuple(), initStringInfo(), IsTransactionState(), MakeSingleTupleTableSlot(), pfree(), PRIMARY_INFO_OUTPUT_COL_COUNT, PrimarySlotName, quote_literal_cstr(), res, slot_getattr(), StartTransactionCommand(), TTSOpsMinimalTuple, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and wrconn.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ ValidateSlotSyncParams()

bool ValidateSlotSyncParams ( int  elevel)

Definition at line 1039 of file slotsync.c.

1040 {
1041  /*
1042  * Logical slot sync/creation requires wal_level >= logical.
1043  *
1044  * Since altering the wal_level requires a server restart, so error out in
1045  * this case regardless of elevel provided by caller.
1046  */
1048  ereport(ERROR,
1049  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050  errmsg("slot synchronization requires \"wal_level\" >= \"logical\""));
1051 
1052  /*
1053  * A physical replication slot(primary_slot_name) is required on the
1054  * primary to ensure that the rows needed by the standby are not removed
1055  * after restarting, so that the synchronized slot on the standby will not
1056  * be invalidated.
1057  */
1058  if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
1059  {
1060  ereport(elevel,
1061  /* translator: %s is a GUC variable name */
1062  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1063  errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
1064  return false;
1065  }
1066 
1067  /*
1068  * hot_standby_feedback must be enabled to cooperate with the physical
1069  * replication slot, which allows informing the primary about the xmin and
1070  * catalog_xmin values on the standby.
1071  */
1072  if (!hot_standby_feedback)
1073  {
1074  ereport(elevel,
1075  /* translator: %s is a GUC variable name */
1076  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077  errmsg("slot synchronization requires %s to be enabled",
1078  "hot_standby_feedback"));
1079  return false;
1080  }
1081 
1082  /*
1083  * The primary_conninfo is required to make connection to primary for
1084  * getting slots information.
1085  */
1086  if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
1087  {
1088  ereport(elevel,
1089  /* translator: %s is a GUC variable name */
1090  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1091  errmsg("slot synchronization requires %s to be defined",
1092  "primary_conninfo"));
1093  return false;
1094  }
1095 
1096  return true;
1097 }
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74

References ereport, errcode(), errmsg(), ERROR, hot_standby_feedback, PrimaryConnInfo, PrimarySlotName, wal_level, and WAL_LEVEL_LOGICAL.

Referenced by MaybeStartSlotSyncWorker(), and pg_sync_replication_slots().

◆ wait_for_slot_activity()

static void wait_for_slot_activity ( bool  some_slot_updated)
static

Definition at line 1236 of file slotsync.c.

1237 {
1238  int rc;
1239 
1240  if (!some_slot_updated)
1241  {
1242  /*
1243  * No slots were updated, so double the sleep time, but not beyond the
1244  * maximum allowable value.
1245  */
1247  }
1248  else
1249  {
1250  /*
1251  * Some slots were updated since the last sleep, so reset the sleep
1252  * time.
1253  */
1255  }
1256 
1257  rc = WaitLatch(MyLatch,
1259  sleep_ms,
1260  WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1261 
1262  if (rc & WL_LATCH_SET)
1264 }
#define Min(x, y)
Definition: c.h:1004
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:116
static long sleep_ms
Definition: slotsync.c:119
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:117

References MAX_SLOTSYNC_WORKER_NAPTIME_MS, Min, MIN_SLOTSYNC_WORKER_NAPTIME_MS, MyLatch, ResetLatch(), sleep_ms, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ReplSlotSyncWorkerMain().

Variable Documentation

◆ sleep_ms

long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
static

Definition at line 119 of file slotsync.c.

Referenced by do_watch(), and wait_for_slot_activity().

◆ SlotSyncCtx

◆ sync_replication_slots

bool sync_replication_slots = false

Definition at line 109 of file slotsync.c.

Referenced by MaybeStartSlotSyncWorker(), and slotsync_reread_config().

◆ syncing_slots