PostgreSQL Source Code git master
slotsync.c File Reference
#include "postgres.h"
#include <time.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "libpq/pqsignal.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/snapbuild.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
Include dependency graph for slotsync.c:

Go to the source code of this file.

Data Structures

struct  SlotSyncCtxStruct
 
struct  RemoteSlot
 

Macros

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200
 
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */
 
#define SLOTSYNC_RESTART_INTERVAL_SEC   10
 
#define SLOTSYNC_COLUMN_COUNT   10
 
#define PRIMARY_INFO_OUTPUT_COL_COUNT   2
 

Typedefs

typedef struct SlotSyncCtxStruct SlotSyncCtxStruct
 
typedef struct RemoteSlot RemoteSlot
 

Functions

static void slotsync_failure_callback (int code, Datum arg)
 
static void update_synced_slots_inactive_since (void)
 
static void update_slotsync_skip_stats (SlotSyncSkipReason skip_reason)
 
static bool update_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
 
static Listget_local_synced_slots (void)
 
static bool local_sync_slot_required (ReplicationSlot *local_slot, List *remote_slots)
 
static void drop_local_obsolete_slots (List *remote_slot_list)
 
static void reserve_wal_for_local_slot (XLogRecPtr restart_lsn)
 
static bool update_and_persist_local_synced_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_one_slot (RemoteSlot *remote_slot, Oid remote_dbid)
 
static bool synchronize_slots (WalReceiverConn *wrconn)
 
static void validate_remote_info (WalReceiverConn *wrconn)
 
char * CheckAndGetDbnameFromConninfo (void)
 
bool ValidateSlotSyncParams (int elevel)
 
static void slotsync_reread_config (void)
 
static void ProcessSlotSyncInterrupts (void)
 
static void slotsync_worker_disconnect (int code, Datum arg)
 
static void slotsync_worker_onexit (int code, Datum arg)
 
static void wait_for_slot_activity (bool some_slot_updated)
 
static void check_and_set_sync_info (pid_t sync_process_pid)
 
static void reset_syncing_flag (void)
 
void ReplSlotSyncWorkerMain (const void *startup_data, size_t startup_data_len)
 
void ShutDownSlotSync (void)
 
bool SlotSyncWorkerCanRestart (void)
 
bool IsSyncingReplicationSlots (void)
 
Size SlotSyncShmemSize (void)
 
void SlotSyncShmemInit (void)
 
void SyncReplicationSlots (WalReceiverConn *wrconn)
 

Variables

static SlotSyncCtxStructSlotSyncCtx = NULL
 
bool sync_replication_slots = false
 
static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
 
static bool syncing_slots = false
 

Macro Definition Documentation

◆ MAX_SLOTSYNC_WORKER_NAPTIME_MS

#define MAX_SLOTSYNC_WORKER_NAPTIME_MS   30000 /* 30s */

Definition at line 117 of file slotsync.c.

◆ MIN_SLOTSYNC_WORKER_NAPTIME_MS

#define MIN_SLOTSYNC_WORKER_NAPTIME_MS   200

Definition at line 116 of file slotsync.c.

◆ PRIMARY_INFO_OUTPUT_COL_COUNT

#define PRIMARY_INFO_OUTPUT_COL_COUNT   2

◆ SLOTSYNC_COLUMN_COUNT

#define SLOTSYNC_COLUMN_COUNT   10

◆ SLOTSYNC_RESTART_INTERVAL_SEC

#define SLOTSYNC_RESTART_INTERVAL_SEC   10

Definition at line 122 of file slotsync.c.

Typedef Documentation

◆ RemoteSlot

typedef struct RemoteSlot RemoteSlot

◆ SlotSyncCtxStruct

Function Documentation

◆ check_and_set_sync_info()

static void check_and_set_sync_info ( pid_t  sync_process_pid)
static

Definition at line 1416 of file slotsync.c.

1417{
1419
1420 if (SlotSyncCtx->syncing)
1421 {
1423 ereport(ERROR,
1424 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1425 errmsg("cannot synchronize replication slots concurrently"));
1426 }
1427
1428 /* The pid must not be already assigned in SlotSyncCtx */
1430
1431 SlotSyncCtx->syncing = true;
1432
1433 /*
1434 * Advertise the required PID so that the startup process can kill the
1435 * slot sync process on promotion.
1436 */
1437 SlotSyncCtx->pid = sync_process_pid;
1438
1440
1441 syncing_slots = true;
1442}
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
Assert(PointerIsAligned(start, uint64))
#define InvalidPid
Definition: miscadmin.h:32
static SlotSyncCtxStruct * SlotSyncCtx
Definition: slotsync.c:106
static bool syncing_slots
Definition: slotsync.c:129
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59

References Assert(), ereport, errcode(), errmsg(), ERROR, InvalidPid, SlotSyncCtxStruct::mutex, SlotSyncCtxStruct::pid, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::syncing, and syncing_slots.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ CheckAndGetDbnameFromConninfo()

char * CheckAndGetDbnameFromConninfo ( void  )

Definition at line 1113 of file slotsync.c.

1114{
1115 char *dbname;
1116
1117 /*
1118 * The slot synchronization needs a database connection for walrcv_exec to
1119 * work.
1120 */
1122 if (dbname == NULL)
1123 ereport(ERROR,
1124 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1125
1126 /*
1127 * translator: first %s is a connection option; second %s is a GUC
1128 * variable name
1129 */
1130 errmsg("replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1131 "dbname", "primary_conninfo"));
1132 return dbname;
1133}
char * dbname
Definition: streamutil.c:49
#define walrcv_get_dbname_from_conninfo(conninfo)
Definition: walreceiver.h:445
char * PrimaryConnInfo
Definition: xlogrecovery.c:99

References dbname, ereport, errcode(), errmsg(), ERROR, PrimaryConnInfo, and walrcv_get_dbname_from_conninfo.

Referenced by pg_sync_replication_slots(), and ReplSlotSyncWorkerMain().

◆ drop_local_obsolete_slots()

static void drop_local_obsolete_slots ( List remote_slot_list)
static

Definition at line 478 of file slotsync.c.

479{
480 List *local_slots = get_local_synced_slots();
481
482 foreach_ptr(ReplicationSlot, local_slot, local_slots)
483 {
484 /* Drop the local slot if it is not required to be retained. */
485 if (!local_sync_slot_required(local_slot, remote_slot_list))
486 {
487 bool synced_slot;
488
489 /*
490 * Use shared lock to prevent a conflict with
491 * ReplicationSlotsDropDBSlots(), trying to drop the same slot
492 * during a drop-database operation.
493 */
494 LockSharedObject(DatabaseRelationId, local_slot->data.database,
495 0, AccessShareLock);
496
497 /*
498 * In the small window between getting the slot to drop and
499 * locking the database, there is a possibility of a parallel
500 * database drop by the startup process and the creation of a new
501 * slot by the user. This new user-created slot may end up using
502 * the same shared memory as that of 'local_slot'. Thus check if
503 * local_slot is still the synced one before performing actual
504 * drop.
505 */
506 SpinLockAcquire(&local_slot->mutex);
507 synced_slot = local_slot->in_use && local_slot->data.synced;
508 SpinLockRelease(&local_slot->mutex);
509
510 if (synced_slot)
511 {
512 ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
514 }
515
516 UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
517 0, AccessShareLock);
518
519 ereport(LOG,
520 errmsg("dropped replication slot \"%s\" of database with OID %u",
521 NameStr(local_slot->data.name),
522 local_slot->data.database));
523 }
524 }
525}
#define NameStr(name)
Definition: c.h:765
#define LOG
Definition: elog.h:31
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1148
#define AccessShareLock
Definition: lockdefs.h:36
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:626
void ReplicationSlotDropAcquired(void)
Definition: slot.c:997
static List * get_local_synced_slots(void)
Definition: slotsync.c:394
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
Definition: slotsync.c:425
Definition: pg_list.h:54

References AccessShareLock, ereport, errmsg(), foreach_ptr, get_local_synced_slots(), local_sync_slot_required(), LockSharedObject(), LOG, NameStr, ReplicationSlotAcquire(), ReplicationSlotDropAcquired(), SpinLockAcquire, SpinLockRelease, and UnlockSharedObject().

Referenced by synchronize_slots().

◆ get_local_synced_slots()

static List * get_local_synced_slots ( void  )
static

Definition at line 394 of file slotsync.c.

395{
396 List *local_slots = NIL;
397
398 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
399
400 for (int i = 0; i < max_replication_slots; i++)
401 {
403
404 /* Check if it is a synchronized slot */
405 if (s->in_use && s->data.synced)
406 {
408 local_slots = lappend(local_slots, s);
409 }
410 }
411
412 LWLockRelease(ReplicationSlotControlLock);
413
414 return local_slots;
415}
int i
Definition: isn.c:77
List * lappend(List *list, void *datum)
Definition: list.c:339
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
#define NIL
Definition: pg_list.h:68
int max_replication_slots
Definition: slot.c:151
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:145
#define SlotIsLogical(slot)
Definition: slot.h:285
ReplicationSlot replication_slots[1]
Definition: slot.h:296
bool in_use
Definition: slot.h:186
ReplicationSlotPersistentData data
Definition: slot.h:210

References Assert(), ReplicationSlot::data, i, ReplicationSlot::in_use, lappend(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, NIL, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, SlotIsLogical, and ReplicationSlotPersistentData::synced.

Referenced by drop_local_obsolete_slots().

◆ IsSyncingReplicationSlots()

bool IsSyncingReplicationSlots ( void  )

Definition at line 1797 of file slotsync.c.

1798{
1799 return syncing_slots;
1800}

References syncing_slots.

Referenced by CreateDecodingContext(), GetStandbyFlushRecPtr(), and ReplicationSlotCreate().

◆ local_sync_slot_required()

static bool local_sync_slot_required ( ReplicationSlot local_slot,
List remote_slots 
)
static

Definition at line 425 of file slotsync.c.

426{
427 bool remote_exists = false;
428 bool locally_invalidated = false;
429
430 foreach_ptr(RemoteSlot, remote_slot, remote_slots)
431 {
432 if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
433 {
434 remote_exists = true;
435
436 /*
437 * If remote slot is not invalidated but local slot is marked as
438 * invalidated, then set locally_invalidated flag.
439 */
440 SpinLockAcquire(&local_slot->mutex);
441 locally_invalidated =
442 (remote_slot->invalidated == RS_INVAL_NONE) &&
443 (local_slot->data.invalidated != RS_INVAL_NONE);
444 SpinLockRelease(&local_slot->mutex);
445
446 break;
447 }
448 }
449
450 return (remote_exists && !locally_invalidated);
451}
@ RS_INVAL_NONE
Definition: slot.h:60
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:128
slock_t mutex
Definition: slot.h:183

References ReplicationSlot::data, foreach_ptr, ReplicationSlotPersistentData::invalidated, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, RS_INVAL_NONE, SpinLockAcquire, and SpinLockRelease.

Referenced by drop_local_obsolete_slots().

◆ ProcessSlotSyncInterrupts()

static void ProcessSlotSyncInterrupts ( void  )
static

Definition at line 1287 of file slotsync.c.

1288{
1290
1292 {
1294 {
1295 ereport(LOG,
1296 errmsg("replication slot synchronization worker will stop because promotion is triggered"));
1297
1298 proc_exit(0);
1299 }
1300 else
1301 {
1302 /*
1303 * For the backend executing SQL function
1304 * pg_sync_replication_slots().
1305 */
1306 ereport(ERROR,
1307 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 errmsg("replication slot synchronization will stop because promotion is triggered"));
1309 }
1310 }
1311
1314}
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define AmLogicalSlotSyncWorkerProcess()
Definition: miscadmin.h:386
static void slotsync_reread_config(void)
Definition: slotsync.c:1207

References AmLogicalSlotSyncWorkerProcess, CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errcode(), errmsg(), ERROR, LOG, proc_exit(), slotsync_reread_config(), SlotSyncCtx, and SlotSyncCtxStruct::stopSignaled.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ ReplSlotSyncWorkerMain()

void ReplSlotSyncWorkerMain ( const void *  startup_data,
size_t  startup_data_len 
)

Definition at line 1465 of file slotsync.c.

1466{
1467 WalReceiverConn *wrconn = NULL;
1468 char *dbname;
1469 char *err;
1470 sigjmp_buf local_sigjmp_buf;
1471 StringInfoData app_name;
1472
1473 Assert(startup_data_len == 0);
1474
1476
1477 init_ps_display(NULL);
1478
1480
1481 /*
1482 * Create a per-backend PGPROC struct in shared memory. We must do this
1483 * before we access any shared memory.
1484 */
1485 InitProcess();
1486
1487 /*
1488 * Early initialization.
1489 */
1490 BaseInit();
1491
1492 Assert(SlotSyncCtx != NULL);
1493
1494 /*
1495 * If an exception is encountered, processing resumes here.
1496 *
1497 * We just need to clean up, report the error, and go away.
1498 *
1499 * If we do not have this handling here, then since this worker process
1500 * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1501 * Therefore, we create our own exception handler to catch ERRORs.
1502 */
1503 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1504 {
1505 /* since not using PG_TRY, must reset error stack by hand */
1506 error_context_stack = NULL;
1507
1508 /* Prevents interrupts while cleaning up */
1510
1511 /* Report the error to the server log */
1513
1514 /*
1515 * We can now go away. Note that because we called InitProcess, a
1516 * callback was registered to do ProcKill, which will clean up
1517 * necessary state.
1518 */
1519 proc_exit(0);
1520 }
1521
1522 /* We can now handle ereport(ERROR) */
1523 PG_exception_stack = &local_sigjmp_buf;
1524
1525 /* Setup signal handling */
1528 pqsignal(SIGTERM, die);
1531 pqsignal(SIGUSR2, SIG_IGN);
1532 pqsignal(SIGPIPE, SIG_IGN);
1533 pqsignal(SIGCHLD, SIG_DFL);
1534
1536
1537 ereport(LOG, errmsg("slot sync worker started"));
1538
1539 /* Register it as soon as SlotSyncCtx->pid is initialized. */
1541
1542 /*
1543 * Establishes SIGALRM handler and initialize timeout module. It is needed
1544 * by InitPostgres to register different timeouts.
1545 */
1547
1548 /* Load the libpq-specific functions */
1549 load_file("libpqwalreceiver", false);
1550
1551 /*
1552 * Unblock signals (they were blocked when the postmaster forked us)
1553 */
1554 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
1555
1556 /*
1557 * Set always-secure search path, so malicious users can't redirect user
1558 * code (e.g. operators).
1559 *
1560 * It's not strictly necessary since we won't be scanning or writing to
1561 * any user table locally, but it's good to retain it here for added
1562 * precaution.
1563 */
1564 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1565
1567
1568 /*
1569 * Connect to the database specified by the user in primary_conninfo. We
1570 * need a database connection for walrcv_exec to work which we use to
1571 * fetch slot information from the remote node. See comments atop
1572 * libpqrcv_exec.
1573 *
1574 * We do not specify a specific user here since the slot sync worker will
1575 * operate as a superuser. This is safe because the slot sync worker does
1576 * not interact with user tables, eliminating the risk of executing
1577 * arbitrary code within triggers.
1578 */
1579 InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
1580
1582
1583 initStringInfo(&app_name);
1584 if (cluster_name[0])
1585 appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
1586 else
1587 appendStringInfoString(&app_name, "slotsync worker");
1588
1589 /*
1590 * Establish the connection to the primary server for slot
1591 * synchronization.
1592 */
1593 wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
1594 app_name.data, &err);
1595
1596 if (!wrconn)
1597 ereport(ERROR,
1598 errcode(ERRCODE_CONNECTION_FAILURE),
1599 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
1600 app_name.data, err));
1601
1602 pfree(app_name.data);
1603
1604 /*
1605 * Register the disconnection callback.
1606 *
1607 * XXX: This can be combined with previous cleanup registration of
1608 * slotsync_worker_onexit() but that will need the connection to be made
1609 * global and we want to avoid introducing global for this purpose.
1610 */
1612
1613 /*
1614 * Using the specified primary server connection, check that we are not a
1615 * cascading standby and slot configured in 'primary_slot_name' exists on
1616 * the primary server.
1617 */
1619
1620 /* Main loop to synchronize slots */
1621 for (;;)
1622 {
1623 bool some_slot_updated = false;
1624
1626
1627 some_slot_updated = synchronize_slots(wrconn);
1628
1629 wait_for_slot_activity(some_slot_updated);
1630 }
1631
1632 /*
1633 * The slot sync worker can't get here because it will only stop when it
1634 * receives a stop request from the startup process, or when there is an
1635 * error.
1636 */
1637 Assert(false);
1638}
sigset_t UnBlockSig
Definition: pqsignal.c:22
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
void EmitErrorReport(void)
Definition: elog.c:1704
ErrorContextCallback * error_context_stack
Definition: elog.c:95
sigjmp_buf * PG_exception_stack
Definition: elog.c:97
void err(int eval, const char *fmt,...)
Definition: err.c:43
int MyProcPid
Definition: globals.c:47
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4196
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
char * cluster_name
Definition: guc_tables.c:555
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void pfree(void *pointer)
Definition: mcxt.c:1594
@ NormalProcessing
Definition: miscadmin.h:472
@ InitProcessing
Definition: miscadmin.h:471
#define GetProcessingMode()
Definition: miscadmin.h:481
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
#define SetProcessingMode(mode)
Definition: miscadmin.h:483
@ B_SLOTSYNC_WORKER
Definition: miscadmin.h:348
BackendType MyBackendType
Definition: miscinit.c:64
#define die(msg)
Definition: pg_test_fsync.c:99
#define pqsignal
Definition: port.h:551
void FloatExceptionHandler(SIGNAL_ARGS)
Definition: postgres.c:3079
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3062
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
void BaseInit(void)
Definition: postinit.c:607
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
Definition: postinit.c:707
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void init_ps_display(const char *fixed_part)
Definition: ps_status.c:285
static void slotsync_worker_disconnect(int code, Datum arg)
Definition: slotsync.c:1322
char * CheckAndGetDbnameFromConninfo(void)
Definition: slotsync.c:1113
static void ProcessSlotSyncInterrupts(void)
Definition: slotsync.c:1287
static bool synchronize_slots(WalReceiverConn *wrconn)
Definition: slotsync.c:889
static void wait_for_slot_activity(bool some_slot_updated)
Definition: slotsync.c:1381
static void slotsync_worker_onexit(int code, Datum arg)
Definition: slotsync.c:1335
static void validate_remote_info(WalReceiverConn *wrconn)
Definition: slotsync.c:1035
static void check_and_set_sync_info(pid_t sync_process_pid)
Definition: slotsync.c:1416
void InitProcess(void)
Definition: proc.c:395
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
void InitializeTimeouts(void)
Definition: timeout.c:470
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References appendStringInfo(), appendStringInfoString(), Assert(), B_SLOTSYNC_WORKER, BaseInit(), before_shmem_exit(), check_and_set_sync_info(), CheckAndGetDbnameFromConninfo(), cluster_name, StringInfoData::data, dbname, die, EmitErrorReport(), ereport, err(), errcode(), errmsg(), ERROR, error_context_stack, FloatExceptionHandler(), GetProcessingMode, HOLD_INTERRUPTS, init_ps_display(), InitializeTimeouts(), InitPostgres(), InitProcess(), InitProcessing, initStringInfo(), InvalidOid, load_file(), LOG, MyBackendType, MyProcPid, NormalProcessing, pfree(), PG_exception_stack, PGC_S_OVERRIDE, PGC_SUSET, PointerGetDatum(), pqsignal, PrimaryConnInfo, proc_exit(), ProcessSlotSyncInterrupts(), procsignal_sigusr1_handler(), SetConfigOption(), SetProcessingMode, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, slotsync_worker_disconnect(), slotsync_worker_onexit(), SlotSyncCtx, StatementCancelHandler(), synchronize_slots(), UnBlockSig, validate_remote_info(), wait_for_slot_activity(), walrcv_connect, and wrconn.

◆ reserve_wal_for_local_slot()

static void reserve_wal_for_local_slot ( XLogRecPtr  restart_lsn)
static

Definition at line 535 of file slotsync.c.

536{
537 XLogSegNo oldest_segno;
538 XLogSegNo segno;
540
541 Assert(slot != NULL);
543
544 while (true)
545 {
546 SpinLockAcquire(&slot->mutex);
547 slot->data.restart_lsn = restart_lsn;
548 SpinLockRelease(&slot->mutex);
549
550 /* Prevent WAL removal as fast as possible */
552
554
555 /*
556 * Find the oldest existing WAL segment file.
557 *
558 * Normally, we can determine it by using the last removed segment
559 * number. However, if no WAL segment files have been removed by a
560 * checkpoint since startup, we need to search for the oldest segment
561 * file from the current timeline existing in XLOGDIR.
562 *
563 * XXX: Currently, we are searching for the oldest segment in the
564 * current timeline as there is less chance of the slot's restart_lsn
565 * from being some prior timeline, and even if it happens, in the
566 * worst case, we will wait to sync till the slot's restart_lsn moved
567 * to the current timeline.
568 */
569 oldest_segno = XLogGetLastRemovedSegno() + 1;
570
571 if (oldest_segno == 1)
572 {
573 TimeLineID cur_timeline;
574
575 GetWalRcvFlushRecPtr(NULL, &cur_timeline);
576 oldest_segno = XLogGetOldestSegno(cur_timeline);
577 }
578
579 elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
580 segno, oldest_segno);
581
582 /*
583 * If all required WAL is still there, great, otherwise retry. The
584 * slot should prevent further removal of WAL, unless there's a
585 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
586 * the new restart_lsn above, so normally we should never need to loop
587 * more than twice.
588 */
589 if (segno >= oldest_segno)
590 break;
591
592 /* Retry using the location of the oldest wal segment */
593 XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
594 }
595}
#define UINT64_FORMAT
Definition: c.h:571
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1234
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3779
int wal_segment_size
Definition: xlog.c:145
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3795
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint32 TimeLineID
Definition: xlogdefs.h:63
uint64 XLogSegNo
Definition: xlogdefs.h:52

References Assert(), ReplicationSlot::data, DEBUG1, elog, GetWalRcvFlushRecPtr(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, UINT64_FORMAT, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogGetOldestSegno(), XLogRecPtrIsValid, and XLogSegNoOffsetToRecPtr.

Referenced by synchronize_one_slot().

◆ reset_syncing_flag()

static void reset_syncing_flag ( void  )
static

◆ ShutDownSlotSync()

void ShutDownSlotSync ( void  )

Definition at line 1699 of file slotsync.c.

1700{
1701 pid_t sync_process_pid;
1702
1704
1705 SlotSyncCtx->stopSignaled = true;
1706
1707 /*
1708 * Return if neither the slot sync worker is running nor the function
1709 * pg_sync_replication_slots() is executing.
1710 */
1711 if (!SlotSyncCtx->syncing)
1712 {
1715 return;
1716 }
1717
1718 sync_process_pid = SlotSyncCtx->pid;
1719
1721
1722 /*
1723 * Signal process doing slotsync, if any. The process will stop upon
1724 * detecting that the stopSignaled flag is set to true.
1725 */
1726 if (sync_process_pid != InvalidPid)
1727 kill(sync_process_pid, SIGUSR1);
1728
1729 /* Wait for slot sync to end */
1730 for (;;)
1731 {
1732 int rc;
1733
1734 /* Wait a bit, we don't expect to have to wait long */
1735 rc = WaitLatch(MyLatch,
1737 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1738
1739 if (rc & WL_LATCH_SET)
1740 {
1743 }
1744
1746
1747 /* Ensure that no process is syncing the slots. */
1748 if (!SlotSyncCtx->syncing)
1749 break;
1750
1752 }
1753
1755
1757}
struct Latch * MyLatch
Definition: globals.c:63
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
static void update_synced_slots_inactive_since(void)
Definition: slotsync.c:1647
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define kill(pid, sig)
Definition: win32_port.h:490

References CHECK_FOR_INTERRUPTS, InvalidPid, kill, SlotSyncCtxStruct::mutex, MyLatch, SlotSyncCtxStruct::pid, ResetLatch(), SIGUSR1, SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::stopSignaled, SlotSyncCtxStruct::syncing, update_synced_slots_inactive_since(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by FinishWalRecovery().

◆ slotsync_failure_callback()

static void slotsync_failure_callback ( int  code,
Datum  arg 
)
static

Definition at line 1835 of file slotsync.c.

1836{
1838
1839 /*
1840 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1841 *
1842 * The startup process during promotion invokes ShutDownSlotSync() which
1843 * waits for slot sync to finish and it does that by checking the
1844 * 'syncing' flag. Thus the SQL function must be done with slots' release
1845 * and cleanup to avoid any dangling temporary slots or active slots
1846 * before it marks itself as finished syncing.
1847 */
1848
1849 /* Make sure active replication slots are released */
1850 if (MyReplicationSlot != NULL)
1852
1853 /* Also cleanup the synced temporary slots. */
1855
1856 /*
1857 * The set syncing_slots indicates that the process errored out without
1858 * resetting the flag. So, we need to clean up shared memory and reset the
1859 * flag here.
1860 */
1861 if (syncing_slots)
1863
1865}
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
void ReplicationSlotRelease(void)
Definition: slot.c:764
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:853
static void reset_syncing_flag(void)
Definition: slotsync.c:1448
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467

References arg, DatumGetPointer(), MyReplicationSlot, ReplicationSlotCleanup(), ReplicationSlotRelease(), reset_syncing_flag(), syncing_slots, walrcv_disconnect, and wrconn.

Referenced by SyncReplicationSlots().

◆ slotsync_reread_config()

static void slotsync_reread_config ( void  )
static

Definition at line 1207 of file slotsync.c.

1208{
1209 char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
1210 char *old_primary_slotname = pstrdup(PrimarySlotName);
1211 bool old_sync_replication_slots = sync_replication_slots;
1212 bool old_hot_standby_feedback = hot_standby_feedback;
1213 bool conninfo_changed;
1214 bool primary_slotname_changed;
1215 bool is_slotsync_worker = AmLogicalSlotSyncWorkerProcess();
1216 bool parameter_changed = false;
1217
1218 if (is_slotsync_worker)
1220
1221 ConfigReloadPending = false;
1223
1224 conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
1225 primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
1226 pfree(old_primary_conninfo);
1227 pfree(old_primary_slotname);
1228
1229 if (old_sync_replication_slots != sync_replication_slots)
1230 {
1231 if (is_slotsync_worker)
1232 {
1233 ereport(LOG,
1234 /* translator: %s is a GUC variable name */
1235 errmsg("replication slot synchronization worker will stop because \"%s\" is disabled",
1236 "sync_replication_slots"));
1237
1238 proc_exit(0);
1239 }
1240
1241 parameter_changed = true;
1242 }
1243 else
1244 {
1245 if (conninfo_changed ||
1246 primary_slotname_changed ||
1247 (old_hot_standby_feedback != hot_standby_feedback))
1248 {
1249
1250 if (is_slotsync_worker)
1251 {
1252 ereport(LOG,
1253 errmsg("replication slot synchronization worker will restart because of a parameter change"));
1254
1255 /*
1256 * Reset the last-start time for this worker so that the
1257 * postmaster can restart it without waiting for
1258 * SLOTSYNC_RESTART_INTERVAL_SEC.
1259 */
1261
1262 proc_exit(0);
1263 }
1264
1265 parameter_changed = true;
1266 }
1267 }
1268
1269 /*
1270 * If we have reached here with a parameter change, we must be running in
1271 * SQL function, emit error in such a case.
1272 */
1273 if (parameter_changed)
1274 {
1275 Assert(!is_slotsync_worker);
1276 ereport(ERROR,
1277 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1278 errmsg("replication slot synchronization will stop because of a parameter change"));
1279 }
1280
1281}
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
char * pstrdup(const char *in)
Definition: mcxt.c:1759
bool sync_replication_slots
Definition: slotsync.c:109
time_t last_start_time
Definition: slotsync.c:102
bool hot_standby_feedback
Definition: walreceiver.c:90
char * PrimarySlotName
Definition: xlogrecovery.c:100

References AmLogicalSlotSyncWorkerProcess, Assert(), ConfigReloadPending, ereport, errcode(), errmsg(), ERROR, hot_standby_feedback, SlotSyncCtxStruct::last_start_time, LOG, pfree(), PGC_SIGHUP, PrimaryConnInfo, PrimarySlotName, proc_exit(), ProcessConfigFile(), pstrdup(), SlotSyncCtx, and sync_replication_slots.

Referenced by ProcessSlotSyncInterrupts().

◆ slotsync_worker_disconnect()

static void slotsync_worker_disconnect ( int  code,
Datum  arg 
)
static

Definition at line 1322 of file slotsync.c.

References arg, DatumGetPointer(), walrcv_disconnect, and wrconn.

Referenced by ReplSlotSyncWorkerMain().

◆ slotsync_worker_onexit()

static void slotsync_worker_onexit ( int  code,
Datum  arg 
)
static

Definition at line 1335 of file slotsync.c.

1336{
1337 /*
1338 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1339 *
1340 * The startup process during promotion invokes ShutDownSlotSync() which
1341 * waits for slot sync to finish and it does that by checking the
1342 * 'syncing' flag. Thus the slot sync worker must be done with slots'
1343 * release and cleanup to avoid any dangling temporary slots or active
1344 * slots before it marks itself as finished syncing.
1345 */
1346
1347 /* Make sure active replication slots are released */
1348 if (MyReplicationSlot != NULL)
1350
1351 /* Also cleanup the temporary slots. */
1353
1355
1357
1358 /*
1359 * If syncing_slots is true, it indicates that the process errored out
1360 * without resetting the flag. So, we need to clean up shared memory and
1361 * reset the flag here.
1362 */
1363 if (syncing_slots)
1364 {
1365 SlotSyncCtx->syncing = false;
1366 syncing_slots = false;
1367 }
1368
1370}

References InvalidPid, SlotSyncCtxStruct::mutex, MyReplicationSlot, SlotSyncCtxStruct::pid, ReplicationSlotCleanup(), ReplicationSlotRelease(), SlotSyncCtx, SpinLockAcquire, SpinLockRelease, SlotSyncCtxStruct::syncing, and syncing_slots.

Referenced by ReplSlotSyncWorkerMain().

◆ SlotSyncShmemInit()

void SlotSyncShmemInit ( void  )

Definition at line 1815 of file slotsync.c.

1816{
1817 Size size = SlotSyncShmemSize();
1818 bool found;
1819
1821 ShmemInitStruct("Slot Sync Data", size, &found);
1822
1823 if (!found)
1824 {
1825 memset(SlotSyncCtx, 0, size);
1828 }
1829}
size_t Size
Definition: c.h:624
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
Size SlotSyncShmemSize(void)
Definition: slotsync.c:1806
#define SpinLockInit(lock)
Definition: spin.h:57

References InvalidPid, SlotSyncCtxStruct::mutex, SlotSyncCtxStruct::pid, ShmemInitStruct(), SlotSyncCtx, SlotSyncShmemSize(), and SpinLockInit.

Referenced by CreateOrAttachShmemStructs().

◆ SlotSyncShmemSize()

Size SlotSyncShmemSize ( void  )

Definition at line 1806 of file slotsync.c.

1807{
1808 return sizeof(SlotSyncCtxStruct);
1809}
struct SlotSyncCtxStruct SlotSyncCtxStruct

Referenced by CalculateShmemSize(), and SlotSyncShmemInit().

◆ SlotSyncWorkerCanRestart()

bool SlotSyncWorkerCanRestart ( void  )

Definition at line 1772 of file slotsync.c.

1773{
1774 time_t curtime = time(NULL);
1775
1776 /*
1777 * If first time through, or time somehow went backwards, always update
1778 * last_start_time to match the current clock and allow worker start.
1779 * Otherwise allow it only once enough time has elapsed.
1780 */
1781 if (SlotSyncCtx->last_start_time == 0 ||
1782 curtime < SlotSyncCtx->last_start_time ||
1784 {
1785 SlotSyncCtx->last_start_time = curtime;
1786 return true;
1787 }
1788 return false;
1789}
#define SLOTSYNC_RESTART_INTERVAL_SEC
Definition: slotsync.c:122

References SlotSyncCtxStruct::last_start_time, SLOTSYNC_RESTART_INTERVAL_SEC, and SlotSyncCtx.

Referenced by LaunchMissingBackgroundProcesses().

◆ synchronize_one_slot()

static bool synchronize_one_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 671 of file slotsync.c.

672{
673 ReplicationSlot *slot;
674 XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
675 bool slot_updated = false;
676
677 /* Search for the named slot */
678 if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
679 {
680 bool synced;
681
682 SpinLockAcquire(&slot->mutex);
683 synced = slot->data.synced;
684 SpinLockRelease(&slot->mutex);
685
686 /* User-created slot with the same name exists, raise ERROR. */
687 if (!synced)
689 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
690 errmsg("exiting from slot synchronization because same"
691 " name slot \"%s\" already exists on the standby",
692 remote_slot->name));
693
694 /*
695 * The slot has been synchronized before.
696 *
697 * It is important to acquire the slot here before checking
698 * invalidation. If we don't acquire the slot first, there could be a
699 * race condition that the local slot could be invalidated just after
700 * checking the 'invalidated' flag here and we could end up
701 * overwriting 'invalidated' flag to remote_slot's value. See
702 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
703 * if the slot is not acquired by other processes.
704 *
705 * XXX: If it ever turns out that slot acquire/release is costly for
706 * cases when none of the slot properties is changed then we can do a
707 * pre-check to ensure that at least one of the slot properties is
708 * changed before acquiring the slot.
709 */
710 ReplicationSlotAcquire(remote_slot->name, true, false);
711
712 Assert(slot == MyReplicationSlot);
713
714 /*
715 * Copy the invalidation cause from remote only if local slot is not
716 * invalidated locally, we don't want to overwrite existing one.
717 */
718 if (slot->data.invalidated == RS_INVAL_NONE &&
719 remote_slot->invalidated != RS_INVAL_NONE)
720 {
721 SpinLockAcquire(&slot->mutex);
722 slot->data.invalidated = remote_slot->invalidated;
723 SpinLockRelease(&slot->mutex);
724
725 /* Make sure the invalidated state persists across server restart */
728
729 slot_updated = true;
730 }
731
732 /* Skip the sync of an invalidated slot */
733 if (slot->data.invalidated != RS_INVAL_NONE)
734 {
736
738 return slot_updated;
739 }
740
741 /*
742 * Make sure that concerned WAL is received and flushed before syncing
743 * slot to target lsn received from the primary server.
744 *
745 * Report statistics only after the slot has been acquired, ensuring
746 * it cannot be dropped during the reporting process.
747 */
748 if (remote_slot->confirmed_lsn > latestFlushPtr)
749 {
751
752 /*
753 * Can get here only if GUC 'synchronized_standby_slots' on the
754 * primary server was not configured correctly.
755 */
757 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
758 errmsg("skipping slot synchronization because the received slot sync"
759 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
760 LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
761 remote_slot->name,
762 LSN_FORMAT_ARGS(latestFlushPtr)));
763
765
766 return slot_updated;
767 }
768
769 /* Slot not ready yet, let's attempt to make it sync-ready now. */
770 if (slot->data.persistency == RS_TEMPORARY)
771 {
772 slot_updated = update_and_persist_local_synced_slot(remote_slot,
773 remote_dbid);
774 }
775
776 /* Slot ready for sync, so sync it. */
777 else
778 {
779 /*
780 * Sanity check: As long as the invalidations are handled
781 * appropriately as above, this should never happen.
782 *
783 * We don't need to check restart_lsn here. See the comments in
784 * update_local_synced_slot() for details.
785 */
786 if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
788 errmsg_internal("cannot synchronize local slot \"%s\"",
789 remote_slot->name),
790 errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
792 LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
793
794 slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
795 NULL, NULL);
796 }
797 }
798 /* Otherwise create the slot first. */
799 else
800 {
801 NameData plugin_name;
802 TransactionId xmin_horizon = InvalidTransactionId;
803
804 /* Skip creating the local slot if remote_slot is invalidated already */
805 if (remote_slot->invalidated != RS_INVAL_NONE)
806 return false;
807
808 /*
809 * We create temporary slots instead of ephemeral slots here because
810 * we want the slots to survive after releasing them. This is done to
811 * avoid dropping and re-creating the slots in each synchronization
812 * cycle if the restart_lsn or catalog_xmin of the remote slot has not
813 * caught up.
814 */
815 ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
816 remote_slot->two_phase,
817 remote_slot->failover,
818 true);
819
820 /* For shorter lines. */
821 slot = MyReplicationSlot;
822
823 /* Avoid expensive operations while holding a spinlock. */
824 namestrcpy(&plugin_name, remote_slot->plugin);
825
826 SpinLockAcquire(&slot->mutex);
827 slot->data.database = remote_dbid;
828 slot->data.plugin = plugin_name;
829 SpinLockRelease(&slot->mutex);
830
832
833 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
834 xmin_horizon = GetOldestSafeDecodingTransactionId(true);
835 SpinLockAcquire(&slot->mutex);
836 slot->effective_catalog_xmin = xmin_horizon;
837 slot->data.catalog_xmin = xmin_horizon;
838 SpinLockRelease(&slot->mutex);
840 LWLockRelease(ProcArrayLock);
841
842 /*
843 * Make sure that concerned WAL is received and flushed before syncing
844 * slot to target lsn received from the primary server.
845 *
846 * Report statistics only after the slot has been acquired, ensuring
847 * it cannot be dropped during the reporting process.
848 */
849 if (remote_slot->confirmed_lsn > latestFlushPtr)
850 {
852
853 /*
854 * Can get here only if GUC 'synchronized_standby_slots' on the
855 * primary server was not configured correctly.
856 */
858 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 errmsg("skipping slot synchronization because the received slot sync"
860 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
861 LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
862 remote_slot->name,
863 LSN_FORMAT_ARGS(latestFlushPtr)));
864
866
867 return false;
868 }
869
870 update_and_persist_local_synced_slot(remote_slot, remote_dbid);
871
872 slot_updated = true;
873 }
874
876
877 return slot_updated;
878}
uint32 TransactionId
Definition: c.h:671
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void namestrcpy(Name name, const char *str)
Definition: name.c:233
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2907
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1139
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1178
void ReplicationSlotSave(void)
Definition: slot.c:1121
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:546
@ RS_TEMPORARY
Definition: slot.h:47
@ SS_SKIP_WAL_NOT_FLUSHED
Definition: slot.h:83
@ SS_SKIP_INVALID
Definition: slot.h:89
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
Definition: slotsync.c:535
static void update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
Definition: slotsync.c:159
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:606
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
Definition: slotsync.c:198
bool two_phase
Definition: slotsync.c:140
char * plugin
Definition: slotsync.c:138
char * name
Definition: slotsync.c:137
bool failover
Definition: slotsync.c:141
ReplicationSlotInvalidationCause invalidated
Definition: slotsync.c:148
XLogRecPtr confirmed_lsn
Definition: slotsync.c:143
XLogRecPtr restart_lsn
Definition: slotsync.c:142
TransactionId catalog_xmin
Definition: slot.h:122
XLogRecPtr confirmed_flush
Definition: slot.h:136
ReplicationSlotPersistency persistency
Definition: slot.h:106
TransactionId effective_catalog_xmin
Definition: slot.h:207
Definition: c.h:760
#define InvalidTransactionId
Definition: transam.h:31
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3631
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AmLogicalSlotSyncWorkerProcess, Assert(), ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, ReplicationSlot::effective_catalog_xmin, ereport, errcode(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, GetOldestSafeDecodingTransactionId(), GetStandbyFlushRecPtr(), RemoteSlot::invalidated, ReplicationSlotPersistentData::invalidated, InvalidTransactionId, LOG, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotAcquire(), ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredXmin(), reserve_wal_for_local_slot(), RemoteSlot::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SearchNamedReplicationSlot(), SpinLockAcquire, SpinLockRelease, SS_SKIP_INVALID, SS_SKIP_WAL_NOT_FLUSHED, ReplicationSlotPersistentData::synced, RemoteSlot::two_phase, update_and_persist_local_synced_slot(), update_local_synced_slot(), and update_slotsync_skip_stats().

Referenced by synchronize_slots().

◆ synchronize_slots()

static bool synchronize_slots ( WalReceiverConn wrconn)
static

Definition at line 889 of file slotsync.c.

890{
891#define SLOTSYNC_COLUMN_COUNT 10
892 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
893 LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
894
895 WalRcvExecResult *res;
896 TupleTableSlot *tupslot;
897 List *remote_slot_list = NIL;
898 bool some_slot_updated = false;
899 bool started_tx = false;
900 const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
901 " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
902 " database, invalidation_reason"
903 " FROM pg_catalog.pg_replication_slots"
904 " WHERE failover and NOT temporary";
905
906 /* The syscache access in walrcv_exec() needs a transaction env. */
907 if (!IsTransactionState())
908 {
910 started_tx = true;
911 }
912
913 /* Execute the query */
914 res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
915 if (res->status != WALRCV_OK_TUPLES)
917 errmsg("could not fetch failover logical slots info from the primary server: %s",
918 res->err));
919
920 /* Construct the remote_slot tuple and synchronize each slot locally */
922 while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
923 {
924 bool isnull;
925 RemoteSlot *remote_slot = palloc0_object(RemoteSlot);
926 Datum d;
927 int col = 0;
928
929 remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
930 &isnull));
931 Assert(!isnull);
932
933 remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
934 &isnull));
935 Assert(!isnull);
936
937 /*
938 * It is possible to get null values for LSN and Xmin if slot is
939 * invalidated on the primary server, so handle accordingly.
940 */
941 d = slot_getattr(tupslot, ++col, &isnull);
942 remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
943 DatumGetLSN(d);
944
945 d = slot_getattr(tupslot, ++col, &isnull);
946 remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
947
948 d = slot_getattr(tupslot, ++col, &isnull);
949 remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
951
952 remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
953 &isnull));
954 Assert(!isnull);
955
956 d = slot_getattr(tupslot, ++col, &isnull);
957 remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
958
959 remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
960 &isnull));
961 Assert(!isnull);
962
963 remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
964 ++col, &isnull));
965 Assert(!isnull);
966
967 d = slot_getattr(tupslot, ++col, &isnull);
968 remote_slot->invalidated = isnull ? RS_INVAL_NONE :
970
971 /* Sanity check */
973
974 /*
975 * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
976 * slot is valid, that means we have fetched the remote_slot in its
977 * RS_EPHEMERAL state. In such a case, don't sync it; we can always
978 * sync it in the next sync cycle when the remote_slot is persisted
979 * and has valid lsn(s) and xmin values.
980 *
981 * XXX: In future, if we plan to expose 'slot->data.persistency' in
982 * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
983 * slots in the first place.
984 */
985 if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) ||
986 !XLogRecPtrIsValid(remote_slot->confirmed_lsn) ||
987 !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
988 remote_slot->invalidated == RS_INVAL_NONE)
989 pfree(remote_slot);
990 else
991 /* Create list of remote slots */
992 remote_slot_list = lappend(remote_slot_list, remote_slot);
993
994 ExecClearTuple(tupslot);
995 }
996
997 /* Drop local slots that no longer need to be synced. */
998 drop_local_obsolete_slots(remote_slot_list);
999
1000 /* Now sync the slots locally */
1001 foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
1002 {
1003 Oid remote_dbid = get_database_oid(remote_slot->database, false);
1004
1005 /*
1006 * Use shared lock to prevent a conflict with
1007 * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
1008 * a drop-database operation.
1009 */
1010 LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
1011
1012 some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
1013
1014 UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
1015 }
1016
1017 /* We are done, free remote_slot_list elements */
1018 list_free_deep(remote_slot_list);
1019
1021
1022 if (started_tx)
1024
1025 return some_slot_updated;
1026}
#define TextDatumGetCString(d)
Definition: builtins.h:98
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3168
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
#define palloc0_object(type)
Definition: fe_memutils.h:75
void list_free_deep(List *list)
Definition: list.c:1560
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:272
unsigned int Oid
Definition: postgres_ext.h:32
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition: slot.c:2733
static void drop_local_obsolete_slots(List *remote_slot_list)
Definition: slotsync.c:478
#define SLOTSYNC_COLUMN_COUNT
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
Definition: slotsync.c:671
char * database
Definition: slotsync.c:139
XLogRecPtr two_phase_at
Definition: slotsync.c:144
TransactionId catalog_xmin
Definition: slotsync.c:145
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
bool IsTransactionState(void)
Definition: xact.c:388
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, Assert(), RemoteSlot::catalog_xmin, CommitTransactionCommand(), RemoteSlot::confirmed_lsn, RemoteSlot::database, DatumGetBool(), DatumGetLSN(), DatumGetTransactionId(), drop_local_obsolete_slots(), ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), RemoteSlot::failover, foreach_ptr, get_database_oid(), GetSlotInvalidationCause(), RemoteSlot::invalidated, InvalidTransactionId, InvalidXLogRecPtr, IsTransactionState(), lappend(), list_free_deep(), LockSharedObject(), MakeSingleTupleTableSlot(), RemoteSlot::name, NIL, palloc0_object, pfree(), RemoteSlot::plugin, RemoteSlot::restart_lsn, RS_INVAL_NONE, slot_getattr(), SLOTSYNC_COLUMN_COUNT, StartTransactionCommand(), WalRcvExecResult::status, synchronize_one_slot(), TextDatumGetCString, TransactionIdIsValid, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), RemoteSlot::two_phase, RemoteSlot::two_phase_at, UnlockSharedObject(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, wrconn, and XLogRecPtrIsValid.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ SyncReplicationSlots()

void SyncReplicationSlots ( WalReceiverConn wrconn)

Definition at line 1872 of file slotsync.c.

1873{
1875 {
1877
1878 /* Check for interrupts and config changes */
1880
1882
1884
1885 /* Cleanup the synced temporary slots */
1887
1888 /* We are done with sync, so reset sync flag */
1890 }
1892}
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
static void slotsync_failure_callback(int code, Datum arg)
Definition: slotsync.c:1835

References check_and_set_sync_info(), MyProcPid, PG_END_ENSURE_ERROR_CLEANUP, PG_ENSURE_ERROR_CLEANUP, PointerGetDatum(), ProcessSlotSyncInterrupts(), ReplicationSlotCleanup(), reset_syncing_flag(), slotsync_failure_callback(), synchronize_slots(), validate_remote_info(), and wrconn.

Referenced by pg_sync_replication_slots().

◆ update_and_persist_local_synced_slot()

static bool update_and_persist_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid 
)
static

Definition at line 606 of file slotsync.c.

607{
609 bool found_consistent_snapshot = false;
610 bool remote_slot_precedes = false;
611
612 /* Slotsync skip stats are handled in function update_local_synced_slot() */
613 (void) update_local_synced_slot(remote_slot, remote_dbid,
614 &found_consistent_snapshot,
615 &remote_slot_precedes);
616
617 /*
618 * Check if the primary server has caught up. Refer to the comment atop
619 * the file for details on this check.
620 */
621 if (remote_slot_precedes)
622 {
623 /*
624 * The remote slot didn't catch up to locally reserved position.
625 *
626 * We do not drop the slot because the restart_lsn can be ahead of the
627 * current location when recreating the slot in the next cycle. It may
628 * take more time to create such a slot. Therefore, we keep this slot
629 * and attempt the synchronization in the next cycle.
630 */
631 return false;
632 }
633
634 /*
635 * Don't persist the slot if it cannot reach the consistent point from the
636 * restart_lsn. See comments atop this file.
637 */
638 if (!found_consistent_snapshot)
639 {
640 ereport(LOG,
641 errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
642 errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
644
645 return false;
646 }
647
649
650 ereport(LOG,
651 errmsg("newly created replication slot \"%s\" is sync-ready now",
652 remote_slot->name));
653
654 return true;
655}
int errdetail(const char *fmt,...)
Definition: elog.c:1216
void ReplicationSlotPersist(void)
Definition: slot.c:1156

References ReplicationSlot::data, ereport, errdetail(), errmsg(), LOG, LSN_FORMAT_ARGS, MyReplicationSlot, RemoteSlot::name, ReplicationSlotPersist(), ReplicationSlotPersistentData::restart_lsn, and update_local_synced_slot().

Referenced by synchronize_one_slot().

◆ update_local_synced_slot()

static bool update_local_synced_slot ( RemoteSlot remote_slot,
Oid  remote_dbid,
bool *  found_consistent_snapshot,
bool *  remote_slot_precedes 
)
static

Definition at line 198 of file slotsync.c.

201{
203 bool updated_xmin_or_lsn = false;
204 bool updated_config = false;
205 SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
206
208
209 if (found_consistent_snapshot)
210 *found_consistent_snapshot = false;
211
212 if (remote_slot_precedes)
213 *remote_slot_precedes = false;
214
215 /*
216 * Don't overwrite if we already have a newer catalog_xmin and
217 * restart_lsn.
218 */
219 if (remote_slot->restart_lsn < slot->data.restart_lsn ||
221 slot->data.catalog_xmin))
222 {
223 /* Update slot sync skip stats */
225
226 /*
227 * This can happen in following situations:
228 *
229 * If the slot is temporary, it means either the initial WAL location
230 * reserved for the local slot is ahead of the remote slot's
231 * restart_lsn or the initial xmin_horizon computed for the local slot
232 * is ahead of the remote slot.
233 *
234 * If the slot is persistent, both restart_lsn and catalog_xmin of the
235 * synced slot could still be ahead of the remote slot. Since we use
236 * slot advance functionality to keep snapbuild/slot updated, it is
237 * possible that the restart_lsn and catalog_xmin are advanced to a
238 * later position than it has on the primary. This can happen when
239 * slot advancing machinery finds running xacts record after reaching
240 * the consistent state at a later point than the primary where it
241 * serializes the snapshot and updates the restart_lsn.
242 *
243 * We LOG the message if the slot is temporary as it can help the user
244 * to understand why the slot is not sync-ready. In the case of a
245 * persistent slot, it would be a more common case and won't directly
246 * impact the users, so we used DEBUG1 level to log the message.
247 */
249 errmsg("could not synchronize replication slot \"%s\"",
250 remote_slot->name),
251 errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
252 LSN_FORMAT_ARGS(remote_slot->restart_lsn),
253 remote_slot->catalog_xmin,
255 slot->data.catalog_xmin));
256
257 if (remote_slot_precedes)
258 *remote_slot_precedes = true;
259
260 /*
261 * Skip updating the configuration. This is required to avoid syncing
262 * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
263 * transaction between old confirmed_lsn and two_phase_at will
264 * unexpectedly get decoded and sent to the downstream after
265 * promotion. See comments in ReorderBufferFinishPrepared.
266 */
267 return false;
268 }
269
270 /*
271 * Attempt to sync LSNs and xmins only if remote slot is ahead of local
272 * slot.
273 */
274 if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
275 remote_slot->restart_lsn > slot->data.restart_lsn ||
277 slot->data.catalog_xmin))
278 {
279 /*
280 * We can't directly copy the remote slot's LSN or xmin unless there
281 * exists a consistent snapshot at that point. Otherwise, after
282 * promotion, the slots may not reach a consistent point before the
283 * confirmed_flush_lsn which can lead to a data loss. To avoid data
284 * loss, we let slot machinery advance the slot which ensures that
285 * snapbuilder/slot statuses are updated properly.
286 */
287 if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
288 {
289 /*
290 * Update the slot info directly if there is a serialized snapshot
291 * at the restart_lsn, as the slot can quickly reach consistency
292 * at restart_lsn by restoring the snapshot.
293 */
294 SpinLockAcquire(&slot->mutex);
295 slot->data.restart_lsn = remote_slot->restart_lsn;
296 slot->data.confirmed_flush = remote_slot->confirmed_lsn;
297 slot->data.catalog_xmin = remote_slot->catalog_xmin;
298 SpinLockRelease(&slot->mutex);
299
300 if (found_consistent_snapshot)
301 *found_consistent_snapshot = true;
302 }
303 else
304 {
306 found_consistent_snapshot);
307
308 /* Sanity check */
309 if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
311 errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
312 remote_slot->name),
313 errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
314 LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
316
317 /*
318 * If we can't reach a consistent snapshot, the slot won't be
319 * persisted. See update_and_persist_local_synced_slot().
320 */
321 if (found_consistent_snapshot && !(*found_consistent_snapshot))
322 skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
323 }
324
325 updated_xmin_or_lsn = true;
326 }
327
328 /* Update slot sync skip stats */
329 update_slotsync_skip_stats(skip_reason);
330
331 if (remote_dbid != slot->data.database ||
332 remote_slot->two_phase != slot->data.two_phase ||
333 remote_slot->failover != slot->data.failover ||
334 strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
335 remote_slot->two_phase_at != slot->data.two_phase_at)
336 {
337 NameData plugin_name;
338
339 /* Avoid expensive operations while holding a spinlock. */
340 namestrcpy(&plugin_name, remote_slot->plugin);
341
342 SpinLockAcquire(&slot->mutex);
343 slot->data.plugin = plugin_name;
344 slot->data.database = remote_dbid;
345 slot->data.two_phase = remote_slot->two_phase;
346 slot->data.two_phase_at = remote_slot->two_phase_at;
347 slot->data.failover = remote_slot->failover;
348 SpinLockRelease(&slot->mutex);
349
350 updated_config = true;
351
352 /*
353 * Ensure that there is no risk of sending prepared transactions
354 * unexpectedly after the promotion.
355 */
357 }
358
359 /*
360 * We have to write the changed xmin to disk *before* we change the
361 * in-memory value, otherwise after a crash we wouldn't know that some
362 * catalog tuples might have been removed already.
363 */
364 if (updated_config || updated_xmin_or_lsn)
365 {
368 }
369
370 /*
371 * Now the new xmin is safely on disk, we can let the global value
372 * advance. We do not take ProcArrayLock or similar since we only advance
373 * xmin here and there's not much harm done by a concurrent computation
374 * missing that.
375 */
376 if (updated_xmin_or_lsn)
377 {
378 SpinLockAcquire(&slot->mutex);
379 slot->effective_catalog_xmin = remote_slot->catalog_xmin;
380 SpinLockRelease(&slot->mutex);
381
384 }
385
386 return updated_config || updated_xmin_or_lsn;
387}
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
Definition: logical.c:2085
SlotSyncSkipReason
Definition: slot.h:81
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition: slot.h:87
@ SS_SKIP_NONE
Definition: slot.h:82
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition: slot.h:85
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
Definition: snapbuild.c:2057
static bool TransactionIdFollows(TransactionId id1, TransactionId id2)
Definition: transam.h:297
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References Assert(), RemoteSlot::catalog_xmin, ReplicationSlotPersistentData::catalog_xmin, ReplicationSlotPersistentData::confirmed_flush, RemoteSlot::confirmed_lsn, ReplicationSlot::data, ReplicationSlotPersistentData::database, DEBUG1, ReplicationSlot::effective_catalog_xmin, ereport, errdetail(), errdetail_internal(), errmsg(), errmsg_internal(), ERROR, RemoteSlot::failover, ReplicationSlotPersistentData::failover, ReplicationSlotPersistentData::invalidated, LOG, LogicalSlotAdvanceAndCheckSnapState(), LSN_FORMAT_ARGS, ReplicationSlot::mutex, MyReplicationSlot, RemoteSlot::name, NameStr, namestrcpy(), ReplicationSlotPersistentData::persistency, RemoteSlot::plugin, ReplicationSlotPersistentData::plugin, ReplicationSlotMarkDirty(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), RemoteSlot::restart_lsn, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_NONE, RS_TEMPORARY, SnapBuildSnapshotExists(), SpinLockAcquire, SpinLockRelease, SS_SKIP_NO_CONSISTENT_SNAPSHOT, SS_SKIP_NONE, SS_SKIP_WAL_OR_ROWS_REMOVED, TransactionIdFollows(), TransactionIdPrecedes(), RemoteSlot::two_phase, ReplicationSlotPersistentData::two_phase, RemoteSlot::two_phase_at, ReplicationSlotPersistentData::two_phase_at, and update_slotsync_skip_stats().

Referenced by synchronize_one_slot(), and update_and_persist_local_synced_slot().

◆ update_slotsync_skip_stats()

static void update_slotsync_skip_stats ( SlotSyncSkipReason  skip_reason)
static

Definition at line 159 of file slotsync.c.

160{
161 ReplicationSlot *slot;
162
164
165 slot = MyReplicationSlot;
166
167 /*
168 * Update the slot sync related stats in pg_stat_replication_slot when a
169 * slot sync is skipped
170 */
171 if (skip_reason != SS_SKIP_NONE)
173
174 /* Update the slot sync skip reason */
175 if (slot->slotsync_skip_reason != skip_reason)
176 {
177 SpinLockAcquire(&slot->mutex);
178 slot->slotsync_skip_reason = skip_reason;
179 SpinLockRelease(&slot->mutex);
180 }
181}
void pgstat_report_replslotsync(ReplicationSlot *slot)
SlotSyncSkipReason slotsync_skip_reason
Definition: slot.h:281

References Assert(), ReplicationSlot::mutex, MyReplicationSlot, pgstat_report_replslotsync(), ReplicationSlot::slotsync_skip_reason, SpinLockAcquire, SpinLockRelease, and SS_SKIP_NONE.

Referenced by synchronize_one_slot(), and update_local_synced_slot().

◆ update_synced_slots_inactive_since()

static void update_synced_slots_inactive_since ( void  )
static

Definition at line 1647 of file slotsync.c.

1648{
1649 TimestampTz now = 0;
1650
1651 /*
1652 * We need to update inactive_since only when we are promoting standby to
1653 * correctly interpret the inactive_since if the standby gets promoted
1654 * without a restart. We don't want the slots to appear inactive for a
1655 * long time after promotion if they haven't been synchronized recently.
1656 * Whoever acquires the slot, i.e., makes the slot active, will reset it.
1657 */
1658 if (!StandbyMode)
1659 return;
1660
1661 /* The slot sync worker or the SQL function mustn't be running by now */
1663
1664 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1665
1666 for (int i = 0; i < max_replication_slots; i++)
1667 {
1669
1670 /* Check if it is a synchronized slot */
1671 if (s->in_use && s->data.synced)
1672 {
1674
1675 /* The slot must not be acquired by any process */
1676 Assert(s->active_pid == 0);
1677
1678 /* Use the same inactive_since time for all the slots. */
1679 if (now == 0)
1681
1683 }
1684 }
1685
1686 LWLockRelease(ReplicationSlotControlLock);
1687}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
int64 TimestampTz
Definition: timestamp.h:39
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:303
pid_t active_pid
Definition: slot.h:189
bool StandbyMode
Definition: xlogrecovery.c:150

References ReplicationSlot::active_pid, Assert(), ReplicationSlot::data, GetCurrentTimestamp(), i, ReplicationSlot::in_use, InvalidPid, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_replication_slots, now(), SlotSyncCtxStruct::pid, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotSetInactiveSince(), SlotIsLogical, SlotSyncCtx, StandbyMode, ReplicationSlotPersistentData::synced, and SlotSyncCtxStruct::syncing.

Referenced by ShutDownSlotSync().

◆ validate_remote_info()

static void validate_remote_info ( WalReceiverConn wrconn)
static

Definition at line 1035 of file slotsync.c.

1036{
1037#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
1038 WalRcvExecResult *res;
1039 Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
1040 StringInfoData cmd;
1041 bool isnull;
1042 TupleTableSlot *tupslot;
1043 bool remote_in_recovery;
1044 bool primary_slot_valid;
1045 bool started_tx = false;
1046
1047 initStringInfo(&cmd);
1048 appendStringInfo(&cmd,
1049 "SELECT pg_is_in_recovery(), count(*) = 1"
1050 " FROM pg_catalog.pg_replication_slots"
1051 " WHERE slot_type='physical' AND slot_name=%s",
1053
1054 /* The syscache access in walrcv_exec() needs a transaction env. */
1055 if (!IsTransactionState())
1056 {
1058 started_tx = true;
1059 }
1060
1062 pfree(cmd.data);
1063
1064 if (res->status != WALRCV_OK_TUPLES)
1065 ereport(ERROR,
1066 errmsg("could not fetch primary slot name \"%s\" info from the primary server: %s",
1067 PrimarySlotName, res->err),
1068 errhint("Check if \"primary_slot_name\" is configured correctly."));
1069
1071 if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
1072 elog(ERROR,
1073 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
1074
1075 remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
1076 Assert(!isnull);
1077
1078 /*
1079 * Slot sync is currently not supported on a cascading standby. This is
1080 * because if we allow it, the primary server needs to wait for all the
1081 * cascading standbys, otherwise, logical subscribers can still be ahead
1082 * of one of the cascading standbys which we plan to promote. Thus, to
1083 * avoid this additional complexity, we restrict it for the time being.
1084 */
1085 if (remote_in_recovery)
1086 ereport(ERROR,
1087 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1088 errmsg("cannot synchronize replication slots from a standby server"));
1089
1090 primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
1091 Assert(!isnull);
1092
1093 if (!primary_slot_valid)
1094 ereport(ERROR,
1095 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1096 /* translator: second %s is a GUC variable name */
1097 errmsg("replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1098 PrimarySlotName, "primary_slot_name"));
1099
1100 ExecClearTuple(tupslot);
1102
1103 if (started_tx)
1105}
int errhint(const char *fmt,...)
Definition: elog.c:1330
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define PRIMARY_INFO_OUTPUT_COL_COUNT

References appendStringInfo(), Assert(), CommitTransactionCommand(), StringInfoData::data, DatumGetBool(), elog, ereport, WalRcvExecResult::err, errcode(), errhint(), errmsg(), ERROR, ExecClearTuple(), initStringInfo(), IsTransactionState(), MakeSingleTupleTableSlot(), pfree(), PRIMARY_INFO_OUTPUT_COL_COUNT, PrimarySlotName, quote_literal_cstr(), slot_getattr(), StartTransactionCommand(), WalRcvExecResult::status, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and wrconn.

Referenced by ReplSlotSyncWorkerMain(), and SyncReplicationSlots().

◆ ValidateSlotSyncParams()

bool ValidateSlotSyncParams ( int  elevel)

Definition at line 1140 of file slotsync.c.

1141{
1142 /*
1143 * Logical slot sync/creation requires wal_level >= logical.
1144 */
1146 {
1147 ereport(elevel,
1148 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1149 errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
1150 return false;
1151 }
1152
1153 /*
1154 * A physical replication slot(primary_slot_name) is required on the
1155 * primary to ensure that the rows needed by the standby are not removed
1156 * after restarting, so that the synchronized slot on the standby will not
1157 * be invalidated.
1158 */
1159 if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
1160 {
1161 ereport(elevel,
1162 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1163 /* translator: %s is a GUC variable name */
1164 errmsg("replication slot synchronization requires \"%s\" to be set", "primary_slot_name"));
1165 return false;
1166 }
1167
1168 /*
1169 * hot_standby_feedback must be enabled to cooperate with the physical
1170 * replication slot, which allows informing the primary about the xmin and
1171 * catalog_xmin values on the standby.
1172 */
1174 {
1175 ereport(elevel,
1176 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1177 /* translator: %s is a GUC variable name */
1178 errmsg("replication slot synchronization requires \"%s\" to be enabled",
1179 "hot_standby_feedback"));
1180 return false;
1181 }
1182
1183 /*
1184 * The primary_conninfo is required to make connection to primary for
1185 * getting slots information.
1186 */
1187 if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
1188 {
1189 ereport(elevel,
1190 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 /* translator: %s is a GUC variable name */
1192 errmsg("replication slot synchronization requires \"%s\" to be set",
1193 "primary_conninfo"));
1194 return false;
1195 }
1196
1197 return true;
1198}
int wal_level
Definition: xlog.c:133
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76

References ereport, errcode(), errmsg(), hot_standby_feedback, PrimaryConnInfo, PrimarySlotName, wal_level, and WAL_LEVEL_LOGICAL.

Referenced by LaunchMissingBackgroundProcesses(), and pg_sync_replication_slots().

◆ wait_for_slot_activity()

static void wait_for_slot_activity ( bool  some_slot_updated)
static

Definition at line 1381 of file slotsync.c.

1382{
1383 int rc;
1384
1385 if (!some_slot_updated)
1386 {
1387 /*
1388 * No slots were updated, so double the sleep time, but not beyond the
1389 * maximum allowable value.
1390 */
1392 }
1393 else
1394 {
1395 /*
1396 * Some slots were updated since the last sleep, so reset the sleep
1397 * time.
1398 */
1400 }
1401
1402 rc = WaitLatch(MyLatch,
1404 sleep_ms,
1405 WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1406
1407 if (rc & WL_LATCH_SET)
1409}
#define Min(x, y)
Definition: c.h:1016
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:116
static long sleep_ms
Definition: slotsync.c:119
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
Definition: slotsync.c:117

References MAX_SLOTSYNC_WORKER_NAPTIME_MS, Min, MIN_SLOTSYNC_WORKER_NAPTIME_MS, MyLatch, ResetLatch(), sleep_ms, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ReplSlotSyncWorkerMain().

Variable Documentation

◆ sleep_ms

long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS
static

Definition at line 119 of file slotsync.c.

Referenced by do_watch(), and wait_for_slot_activity().

◆ SlotSyncCtx

◆ sync_replication_slots

bool sync_replication_slots = false

◆ syncing_slots