PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/subsystems.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndShmemRequest (void *arg)
 
static void WalSndShmemInit (void *arg)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static pg_noreturn void WalSndDoneImmediate (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndCheckShutdownTimeout (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void WalSndHandleConfigReload (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
const ShmemCallbacks WalSndShmemCallbacks
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
int wal_sender_shutdown_timeout = -1
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static TimestampTz shutdown_request_timestamp = 0
 
static bool shutdown_stream_done_queued = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 253 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 118 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 107 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 285 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1472 of file walsender.c.

1473{
1474 bool failover_given = false;
1475 bool two_phase_given = false;
1476 bool failover;
1477 bool two_phase;
1478
1479 /* Parse options */
1481 {
1482 if (strcmp(defel->defname, "failover") == 0)
1483 {
1484 if (failover_given)
1485 ereport(ERROR,
1487 errmsg("conflicting or redundant options")));
1488 failover_given = true;
1490 }
1491 else if (strcmp(defel->defname, "two_phase") == 0)
1492 {
1493 if (two_phase_given)
1494 ereport(ERROR,
1496 errmsg("conflicting or redundant options")));
1497 two_phase_given = true;
1499 }
1500 else
1501 elog(ERROR, "unrecognized option: %s", defel->defname);
1502 }
1503
1507}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
static char * errmsg
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:946

References defGetBoolean(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1249 of file walsender.c.

1250{
1251 const char *snapshot_name = NULL;
1252 char xloc[MAXFNAMELEN];
1253 char *slot_name;
1254 bool reserve_wal = false;
1255 bool two_phase = false;
1256 bool failover = false;
1260 TupleDesc tupdesc;
1261 Datum values[4];
1262 bool nulls[4] = {0};
1263
1265
1267 &failover);
1268
1269 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1270 {
1271 ReplicationSlotCreate(cmd->slotname, false,
1273 false, false, false, false);
1274
1275 if (reserve_wal)
1276 {
1278
1280
1281 /* Write this slot to disk if it's a permanent one. */
1282 if (!cmd->temporary)
1284 }
1285 }
1286 else
1287 {
1289 bool need_full_snapshot = false;
1290
1292
1294
1295 /*
1296 * Initially create persistent slot as ephemeral - that allows us to
1297 * nicely handle errors during initialization because it'll get
1298 * dropped if this transaction fails. We'll make it persistent at the
1299 * end. Temporary slots can be created as temporary from beginning as
1300 * they get dropped on error as well.
1301 */
1305
1306 /*
1307 * Do options check early so that we can bail before calling the
1308 * DecodingContextFindStartpoint which can take long time.
1309 */
1311 {
1312 if (IsTransactionBlock())
1313 ereport(ERROR,
1314 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1315 (errmsg("%s must not be called inside a transaction",
1316 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1317
1318 need_full_snapshot = true;
1319 }
1321 {
1322 if (!IsTransactionBlock())
1323 ereport(ERROR,
1324 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1325 (errmsg("%s must be called inside a transaction",
1326 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1327
1329 ereport(ERROR,
1330 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1331 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1332 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1333 if (!XactReadOnly)
1334 ereport(ERROR,
1335 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1336 (errmsg("%s must be called in a read-only transaction",
1337 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1338
1339 if (FirstSnapshotSet)
1340 ereport(ERROR,
1341 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1342 (errmsg("%s must be called before any query",
1343 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1344
1345 if (IsSubTransaction())
1346 ereport(ERROR,
1347 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1348 (errmsg("%s must not be called in a subtransaction",
1349 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1350
1351 need_full_snapshot = true;
1352 }
1353
1354 /*
1355 * Ensure the logical decoding is enabled before initializing the
1356 * logical decoding context.
1357 */
1360
1362 false,
1365 .segment_open = WalSndSegmentOpen,
1366 .segment_close = wal_segment_close),
1369
1370 /*
1371 * Signal that we don't need the timeout mechanism. We're just
1372 * creating the replication slot and don't yet accept feedback
1373 * messages or send keepalives. As we possibly need to wait for
1374 * further WAL the walsender would otherwise possibly be killed too
1375 * soon.
1376 */
1378
1379 /* build initial snapshot, might take a while */
1381
1382 /*
1383 * Export or use the snapshot if we've been asked to do so.
1384 *
1385 * NB. We will convert the snapbuild.c kind of snapshot to normal
1386 * snapshot when doing this.
1387 */
1389 {
1391 }
1393 {
1394 Snapshot snap;
1395
1398 }
1399
1400 /* don't need the decoding context anymore */
1402
1403 if (!cmd->temporary)
1405 }
1406
1407 snprintf(xloc, sizeof(xloc), "%X/%08X",
1409
1411
1412 /*----------
1413 * Need a tuple descriptor representing four columns:
1414 * - first field: the slot name
1415 * - second field: LSN at which we became consistent
1416 * - third field: exported snapshot's name
1417 * - fourth field: output plugin
1418 */
1419 tupdesc = CreateTemplateTupleDesc(4);
1420 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1421 TEXTOID, -1, 0);
1422 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1423 TEXTOID, -1, 0);
1424 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1425 TEXTOID, -1, 0);
1426 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1427 TEXTOID, -1, 0);
1428 TupleDescFinalize(tupdesc);
1429
1430 /* prepare for projection of tuples */
1432
1433 /* slot_name */
1434 slot_name = NameStr(MyReplicationSlot->data.name);
1435 values[0] = CStringGetTextDatum(slot_name);
1436
1437 /* consistent wal location */
1439
1440 /* snapshot name, or NULL if none */
1441 if (snapshot_name != NULL)
1443 else
1444 nulls[2] = true;
1445
1446 /* plugin, or NULL if none */
1447 if (cmd->plugin != NULL)
1449 else
1450 nulls[3] = true;
1451
1452 /* send it to dest */
1453 do_tup_output(tstate, values, nulls);
1455
1457}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define NameStr(name)
Definition c.h:891
#define Assert(condition)
Definition c.h:999
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:670
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:626
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:322
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:289
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:261
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition slot.c:1180
void ReplicationSlotReserveWal(void)
Definition slot.c:1707
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
void ReplicationSlotPersist(void)
Definition slot.c:1197
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
void ReplicationSlotSave(void)
Definition slot.c:1162
void ReplicationSlotRelease(void)
Definition slot.c:769
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:444
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:542
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:71
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:213
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescFinalize(TupleDesc tupdesc)
Definition tupdesc.c:511
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:985
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1172
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3266
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1634
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1758
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1077
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1607
static TimestampTz last_reply_timestamp
Definition walsender.c:204
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
bool IsSubTransaction(void)
Definition xact.c:5098
bool IsTransactionBlock(void)
Definition xact.c:5025
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:855

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg, ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1463 of file walsender.c.

1464{
1465 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1466}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:915

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 2087 of file walsender.c.

2088{
2089 yyscan_t scanner;
2090 int parse_rc;
2091 Node *cmd_node;
2092 const char *cmdtag;
2094
2095 /* We save and re-use the cmd_context across calls */
2097
2098 /*
2099 * If WAL sender has been told that shutdown is getting close, switch its
2100 * status accordingly to handle the next replication commands correctly.
2101 */
2102 if (got_STOPPING)
2104
2105 /*
2106 * Throw error if in stopping mode. We need prevent commands that could
2107 * generate WAL while the shutdown checkpoint is being written. To be
2108 * safe, we just prohibit all new commands.
2109 */
2111 ereport(ERROR,
2113 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2114
2115 /*
2116 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2117 * command arrives. Clean up the old stuff if there's anything.
2118 */
2120
2122
2123 /*
2124 * Prepare to parse and execute the command.
2125 *
2126 * Because replication command execution can involve beginning or ending
2127 * transactions, we need a working context that will survive that, so we
2128 * make it a child of TopMemoryContext. That in turn creates a hazard of
2129 * long-lived memory leaks if we lose track of the working context. We
2130 * deal with that by creating it only once per walsender, and resetting it
2131 * for each new command. (Normally this reset is a no-op, but if the
2132 * prior exec_replication_command call failed with an error, it won't be.)
2133 *
2134 * This is subtler than it looks. The transactions we manage can extend
2135 * across replication commands, indeed SnapBuildClearExportedSnapshot
2136 * might have just ended one. Because transaction exit will revert to the
2137 * memory context that was current at transaction start, we need to be
2138 * sure that that context is still valid. That motivates re-using the
2139 * same cmd_context rather than making a new one each time.
2140 */
2141 if (cmd_context == NULL)
2143 "Replication command context",
2145 else
2147
2149
2151
2152 /*
2153 * Is it a WalSender command?
2154 */
2156 {
2157 /* Nope; clean up and get out. */
2159
2162
2163 /* XXX this is a pretty random place to make this check */
2164 if (MyDatabaseId == InvalidOid)
2165 ereport(ERROR,
2167 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2168
2169 /* Tell the caller that this wasn't a WalSender command. */
2170 return false;
2171 }
2172
2173 /*
2174 * Looks like a WalSender command, so parse it.
2175 */
2177 if (parse_rc != 0)
2178 ereport(ERROR,
2180 errmsg_internal("replication command parser returned %d",
2181 parse_rc)));
2183
2184 /*
2185 * Report query to various monitoring facilities. For this purpose, we
2186 * report replication commands just like SQL commands.
2187 */
2189
2191
2192 /*
2193 * Log replication command if log_replication_commands is enabled. Even
2194 * when it's disabled, log the command with DEBUG1 level for backward
2195 * compatibility.
2196 */
2198 (errmsg("received replication command: %s", cmd_string)));
2199
2200 /*
2201 * Disallow replication commands in aborted transaction blocks.
2202 */
2204 ereport(ERROR,
2206 errmsg("current transaction is aborted, "
2207 "commands ignored until end of transaction block")));
2208
2210
2211 /*
2212 * Allocate buffers that will be used for each outgoing and incoming
2213 * message. We do this just once per command to reduce palloc overhead.
2214 */
2218
2219 switch (cmd_node->type)
2220 {
2222 cmdtag = "IDENTIFY_SYSTEM";
2226 break;
2227
2229 cmdtag = "READ_REPLICATION_SLOT";
2233 break;
2234
2235 case T_BaseBackupCmd:
2236 cmdtag = "BASE_BACKUP";
2241 break;
2242
2244 cmdtag = "CREATE_REPLICATION_SLOT";
2248 break;
2249
2251 cmdtag = "DROP_REPLICATION_SLOT";
2255 break;
2256
2258 cmdtag = "ALTER_REPLICATION_SLOT";
2262 break;
2263
2265 {
2267
2268 cmdtag = "START_REPLICATION";
2271
2272 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2273 StartReplication(cmd);
2274 else
2276
2277 /* dupe, but necessary per libpqrcv_endstreaming */
2279
2281 break;
2282 }
2283
2285 cmdtag = "TIMELINE_HISTORY";
2290 break;
2291
2292 case T_VariableShowStmt:
2293 {
2296
2297 cmdtag = "SHOW";
2299
2300 /* syscache access needs a transaction environment */
2302 GetPGVariable(n->name, dest);
2305 }
2306 break;
2307
2309 cmdtag = "UPLOAD_MANIFEST";
2314 break;
2315
2316 default:
2317 elog(ERROR, "unrecognized replication command node tag: %u",
2318 cmd_node->type);
2319 }
2320
2321 /*
2322 * Done. Revert to caller's memory context, and clean out the cmd_context
2323 * to recover memory right away.
2324 */
2327
2328 /*
2329 * We need not update ps display or pg_stat_activity, because PostgresMain
2330 * will reset those to "idle". But we must reset debug_query_string to
2331 * ensure it doesn't become a dangling pointer.
2332 */
2334
2335 return true;
2336}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:992
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:217
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
Oid MyDatabaseId
Definition globals.c:96
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:410
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
MemoryContext TopMemoryContext
Definition mcxt.c:167
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const char * debug_query_string
Definition postgres.c:94
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:603
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:133
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1472
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:611
WalSnd * MyWalSnd
Definition walsender.c:132
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:511
static StringInfoData tmpbuf
Definition walsender.c:195
static void IdentifySystem(void)
Definition walsender.c:429
static StringInfoData reply_message
Definition walsender.c:194
void WalSndSetState(WalSndState state)
Definition walsender.c:4177
static StringInfoData output_message
Definition walsender.c:193
static void UploadManifest(void)
Definition walsender.c:702
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:233
bool log_replication_commands
Definition walsender.c:150
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1249
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1514
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:172
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1463
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:844
static XLogReaderState * xlogreader
Definition walsender.c:162
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3701
void StartTransactionCommand(void)
Definition xact.c:3112
bool IsAbortedTransactionBlockState(void)
Definition xact.c:409
void CommitTransactionCommand(void)
Definition xact.c:3210

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3880 of file walsender.c.

3881{
3883 TimeLineID replayTLI;
3887
3889
3890 /*
3891 * We can safely send what's already been replayed. Also, if walreceiver
3892 * is streaming WAL from the same timeline, we can send anything that it
3893 * has streamed, but hasn't been replayed yet.
3894 */
3895
3897 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3898
3899 if (tli)
3900 *tli = replayTLI;
3901
3902 result = replayPtr;
3903 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3905
3906 return result;
3907}
uint32 result
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1919
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:136
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), receiveTLI, and result.

Referenced by IdentifySystem(), StartReplication(), update_local_synced_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 768 of file walsender.c.

770{
771 int mtype;
772 int maxmsglen;
773
775
777 mtype = pq_getbyte();
778 if (mtype == EOF)
781 errmsg("unexpected EOF on client connection with an open transaction")));
782
783 switch (mtype)
784 {
785 case PqMsg_CopyData:
787 break;
788 case PqMsg_CopyDone:
789 case PqMsg_CopyFail:
790 case PqMsg_Flush:
791 case PqMsg_Sync:
793 break;
794 default:
797 errmsg("unexpected message type 0x%02X during COPY from stdin",
798 mtype)));
799 maxmsglen = 0; /* keep compiler quiet */
800 break;
801 }
802
803 /* Now collect the message body */
807 errmsg("unexpected EOF on client connection with an open transaction")));
809
810 /* Process the message */
811 switch (mtype)
812 {
813 case PqMsg_CopyData:
815 return true;
816
817 case PqMsg_CopyDone:
818 return false;
819
820 case PqMsg_Sync:
821 case PqMsg_Flush:
822 /* Ignore these while in CopyOut mode as we do elsewhere. */
823 return true;
824
825 case PqMsg_CopyFail:
828 errmsg("COPY from stdin failed: %s",
830 }
831
832 /* Not reached. */
833 Assert(false);
834 return false;
835}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:33
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:34
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:146
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1204
int pq_getbyte(void)
Definition pqcomm.c:964
void pq_startmsgread(void)
Definition pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3936 of file walsender.c.

3937{
3939
3940 /*
3941 * If replication has not yet started, die like with SIGTERM. If
3942 * replication is active, only set a flag and wake up the main loop. It
3943 * will send any outstanding WAL, wait for it to be replicated to the
3944 * standby, and then exit gracefully.
3945 */
3946 if (!replication_active)
3948 else
3949 got_STOPPING = true;
3950
3951 /* latch will be set by procsignal_sigusr1_handler */
3952}
int MyProcPid
Definition globals.c:49
bool am_walsender
Definition walsender.c:135
static volatile sig_atomic_t replication_active
Definition walsender.c:241
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 429 of file walsender.c.

430{
431 char sysid[32];
432 char xloc[MAXFNAMELEN];
434 char *dbname = NULL;
437 TupleDesc tupdesc;
438 Datum values[4];
439 bool nulls[4] = {0};
440 TimeLineID currTLI;
441
442 /*
443 * Reply with a result set with one row, four columns. First col is system
444 * ID, second is timeline ID, third is current xlog location and the
445 * fourth contains the database name if we are connected to one.
446 */
447
450
453 logptr = GetStandbyFlushRecPtr(&currTLI);
454 else
455 logptr = GetFlushRecPtr(&currTLI);
456
457 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
458
460 {
462
463 /* syscache access needs a transaction env. */
466 /* copy dbname out of TX context */
469 }
470
472
473 /* need a tuple descriptor representing four columns */
474 tupdesc = CreateTemplateTupleDesc(4);
475 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
476 TEXTOID, -1, 0);
477 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
478 INT8OID, -1, 0);
479 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
480 TEXTOID, -1, 0);
481 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
482 TEXTOID, -1, 0);
483 TupleDescFinalize(tupdesc);
484
485 /* prepare for projection of tuples */
487
488 /* column 1: system identifier */
490
491 /* column 2: timeline */
492 values[1] = Int64GetDatum(currTLI);
493
494 /* column 3: wal location */
496
497 /* column 4: database name, or NULL if none */
498 if (dbname)
500 else
501 nulls[3] = true;
502
503 /* send it to dest */
504 do_tup_output(tstate, values, nulls);
505
507}
#define UINT64_FORMAT
Definition c.h:691
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1384
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1897
static Datum Int64GetDatum(int64 X)
Definition postgres.h:426
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3880
uint64 GetSystemIdentifier(void)
Definition xlog.c:4643
bool RecoveryInProgress(void)
Definition xlog.c:6836
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:7001

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 330 of file walsender.c.

331{
333
334 /* Create a per-walsender data structure in shared memory */
336
337 /* need resource owner for e.g. basebackups */
339
340 /*
341 * Let postmaster know that we're a WAL sender. Once we've declared us as
342 * a WAL sender process, postmaster will let us outlive the bgwriter and
343 * kill us last in the shutdown sequence, so we get a chance to stream all
344 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
345 * there's no going back, and we mustn't write any WAL records after this.
346 */
349
350 /*
351 * If the client didn't specify a database to connect to, show in PGPROC
352 * that our advertised xmin should affect vacuum horizons in all
353 * databases. This allows physical replication clients to send hot
354 * standby feedback that will delay vacuum cleanup in all databases.
355 */
357 {
363 }
364
365 /* Initialize empty timestamp buffer for lag tracking. */
367}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1269
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:308
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:44
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:66
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:1006
PROC_HDR * ProcGlobal
Definition proc.c:74
TransactionId xmin
Definition proc.h:242
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3174
static LagTracker * lag_tracker
Definition walsender.c:279

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3174 of file walsender.c.

3175{
3176 int i;
3177
3178 /*
3179 * WalSndCtl should be set up already (we inherit this by fork() or
3180 * EXEC_BACKEND mechanism from the postmaster).
3181 */
3182 Assert(WalSndCtl != NULL);
3183 Assert(MyWalSnd == NULL);
3184
3185 /*
3186 * Find a free walsender slot and reserve it. This must not fail due to
3187 * the prior check for free WAL senders in InitProcess().
3188 */
3189 for (i = 0; i < max_wal_senders; i++)
3190 {
3192
3193 SpinLockAcquire(&walsnd->mutex);
3194
3195 if (walsnd->pid != 0)
3196 {
3197 SpinLockRelease(&walsnd->mutex);
3198 continue;
3199 }
3200 else
3201 {
3202 /*
3203 * Found a free slot. Reserve it for us.
3204 */
3205 walsnd->pid = MyProcPid;
3206 walsnd->state = WALSNDSTATE_STARTUP;
3207 walsnd->sentPtr = InvalidXLogRecPtr;
3208 walsnd->needreload = false;
3209 walsnd->write = InvalidXLogRecPtr;
3210 walsnd->flush = InvalidXLogRecPtr;
3211 walsnd->apply = InvalidXLogRecPtr;
3212 walsnd->writeLag = -1;
3213 walsnd->flushLag = -1;
3214 walsnd->applyLag = -1;
3215 walsnd->sync_standby_priority = 0;
3216 walsnd->replyTime = 0;
3217
3218 /*
3219 * The kind assignment is done here and not in StartReplication()
3220 * and StartLogicalReplication(). Indeed, the logical walsender
3221 * needs to read WAL records (like snapshot of running
3222 * transactions) during the slot creation. So it needs to be woken
3223 * up based on its kind.
3224 *
3225 * The kind assignment could also be done in StartReplication(),
3226 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3227 * seems better to set it on one place.
3228 */
3229 if (MyDatabaseId == InvalidOid)
3231 else
3233
3234 SpinLockRelease(&walsnd->mutex);
3235 /* don't need the lock anymore */
3236 MyWalSnd = walsnd;
3237
3238 break;
3239 }
3240 }
3241
3242 Assert(MyWalSnd != NULL);
3243
3244 /* Arrange to clean up at walsender exit */
3246}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:141
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3250
WalSndCtlData * WalSndCtl
Definition walsender.c:121
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4521 of file walsender.c.

4522{
4523 TimestampTz time = 0;
4524
4525 /*
4526 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4527 * return the elapsed time (in microseconds) since the saved local flush
4528 * time. If the flush time is in the future (due to clock drift), return
4529 * -1 to treat as no valid sample.
4530 *
4531 * Otherwise, switch back to using the buffer to control the read head and
4532 * compute the elapsed time. The read head is then reset to point to the
4533 * oldest entry in the buffer.
4534 */
4535 if (lag_tracker->read_heads[head] == -1)
4536 {
4537 if (lag_tracker->overflowed[head].lsn > lsn)
4538 return (now >= lag_tracker->overflowed[head].time) ?
4539 now - lag_tracker->overflowed[head].time : -1;
4540
4541 time = lag_tracker->overflowed[head].time;
4543 lag_tracker->read_heads[head] =
4545 }
4546
4547 /* Read all unread samples up to this LSN or end of buffer. */
4548 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4550 {
4552 lag_tracker->last_read[head] =
4554 lag_tracker->read_heads[head] =
4556 }
4557
4558 /*
4559 * If the lag tracker is empty, that means the standby has processed
4560 * everything we've ever sent so we should now clear 'last_read'. If we
4561 * didn't do that, we'd risk using a stale and irrelevant sample for
4562 * interpolation at the beginning of the next burst of WAL after a period
4563 * of idleness.
4564 */
4566 lag_tracker->last_read[head].time = 0;
4567
4568 if (time > now)
4569 {
4570 /* If the clock somehow went backwards, treat as not found. */
4571 return -1;
4572 }
4573 else if (time == 0)
4574 {
4575 /*
4576 * We didn't cross a time. If there is a future sample that we
4577 * haven't reached yet, and we've already reached at least one sample,
4578 * let's interpolate the local flushed time. This is mainly useful
4579 * for reporting a completely stuck apply position as having
4580 * increasing lag, since otherwise we'd have to wait for it to
4581 * eventually start moving again and cross one of our samples before
4582 * we can show the lag increasing.
4583 */
4585 {
4586 /* There are no future samples, so we can't interpolate. */
4587 return -1;
4588 }
4589 else if (lag_tracker->last_read[head].time != 0)
4590 {
4591 /* We can interpolate between last_read and the next sample. */
4592 double fraction;
4593 WalTimeSample prev = lag_tracker->last_read[head];
4595
4596 if (lsn < prev.lsn)
4597 {
4598 /*
4599 * Reported LSNs shouldn't normally go backwards, but it's
4600 * possible when there is a timeline change. Treat as not
4601 * found.
4602 */
4603 return -1;
4604 }
4605
4606 Assert(prev.lsn < next.lsn);
4607
4608 if (prev.time > next.time)
4609 {
4610 /* If the clock somehow went backwards, treat as not found. */
4611 return -1;
4612 }
4613
4614 /* See how far we are between the previous and next samples. */
4615 fraction =
4616 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4617
4618 /* Scale the local flush time proportionally. */
4619 time = (TimestampTz)
4620 ((double) prev.time + (next.time - prev.time) * fraction);
4621 }
4622 else
4623 {
4624 /*
4625 * We have only a future sample, implying that we were entirely
4626 * caught up but and now there is a new burst of WAL and the
4627 * standby hasn't processed the first sample yet. Until the
4628 * standby reaches the future sample the best we can do is report
4629 * the hypothetical lag if that sample were to be replayed now.
4630 */
4632 }
4633 }
4634
4635 /* Return the elapsed time since local flush time in microseconds. */
4636 Assert(time != 0);
4637 return now - time;
4638}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:259
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:261
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:262
int write_head
Definition walsender.c:260
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:276
TimestampTz time
Definition walsender.c:249
XLogRecPtr lsn
Definition walsender.c:248
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:253

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4463 of file walsender.c.

4464{
4465 int new_write_head;
4466 int i;
4467
4468 if (!am_walsender)
4469 return;
4470
4471 /*
4472 * If the lsn hasn't advanced since last time, then do nothing. This way
4473 * we only record a new sample when new WAL has been written.
4474 */
4475 if (lag_tracker->last_lsn == lsn)
4476 return;
4477 lag_tracker->last_lsn = lsn;
4478
4479 /*
4480 * If advancing the write head of the circular buffer would crash into any
4481 * of the read heads, then the buffer is full. In other words, the
4482 * slowest reader (presumably apply) is the one that controls the release
4483 * of space.
4484 */
4486 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4487 {
4488 /*
4489 * If the buffer is full, move the slowest reader to a separate
4490 * overflow entry and free its space in the buffer so the write head
4491 * can advance.
4492 */
4494 {
4497 lag_tracker->read_heads[i] = -1;
4498 }
4499 }
4500
4501 /* Store a sample at the current write head position. */
4505}
XLogRecPtr last_lsn
Definition walsender.c:258
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1077 of file walsender.c.

1079{
1081 int count;
1083 XLogSegNo segno;
1084 TimeLineID currTLI;
1085
1086 /*
1087 * Make sure we have enough WAL available before retrieving the current
1088 * timeline.
1089 */
1091
1092 /* Fail if not enough (implies we are going to shut down) */
1094 return -1;
1095
1096 /*
1097 * Since logical decoding is also permitted on a standby server, we need
1098 * to check if the server is in recovery to decide how to get the current
1099 * timeline ID (so that it also covers the promotion or timeline change
1100 * cases). We must determine am_cascading_walsender after waiting for the
1101 * required WAL so that it is correct when the walsender wakes up after a
1102 * promotion.
1103 */
1105
1107 {
1109
1110 /*
1111 * If the insertion timeline has already been set, use it.
1112 * InsertTimeLineID is set before the WAL segments of the old timeline
1113 * are removed, before SharedRecoveryState switches to
1114 * RECOVERY_STATE_DONE.
1115 *
1116 * There is a window where RecoveryInProgress() still returns true but
1117 * the old timeline's WAL segments have already been removed or
1118 * recycled. Using the WAL insertion timeline avoids attempting to
1119 * read from those removed segments, improving availability, and is a
1120 * safe thing to do as promotion copies the contents in the last
1121 * segment of the old timeline to the first segment of the new
1122 * timeline, up to the switchpoint.
1123 */
1125 if (insertTLI != 0)
1126 currTLI = insertTLI;
1127 else
1128 GetXLogReplayRecPtr(&currTLI);
1129 }
1130 else
1131 currTLI = GetWALInsertionTimeLine();
1132
1134 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1135 sendTimeLine = state->currTLI;
1136 sendTimeLineValidUpto = state->currTLIValidUntil;
1137 sendTimeLineNextTLI = state->nextTLI;
1138
1140 count = XLOG_BLCKSZ; /* more than one block available */
1141 else
1142 count = flushptr - targetPagePtr; /* part of the page available */
1143
1144 /* now actually read the data, we know it's there */
1145 if (!WALRead(state,
1146 cur_page,
1148 count,
1149 currTLI, /* Pass the current TLI because only
1150 * WalSndSegmentOpen controls whether new TLI
1151 * is needed. */
1152 &errinfo))
1154
1155 /*
1156 * After reading into the buffer, check that what we read was valid. We do
1157 * this after reading, because even though the segment was present when we
1158 * opened it, it might get recycled or removed while we read it. The
1159 * read() succeeds in that case, but the data we tried to read might
1160 * already have been overwritten with new WAL records.
1161 */
1162 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1163 CheckXLogRemoved(segno, state->seg.ws_tli);
1164
1165 return count;
1166}
static TimeLineID sendTimeLine
Definition walsender.c:181
static bool sendTimeLineIsHistoric
Definition walsender.c:183
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1908
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:182
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:184
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:7022
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3778
TimeLineID GetWALInsertionTimeLineIfSet(void)
Definition xlog.c:7038
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:731
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1047

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetWALInsertionTimeLineIfSet(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1848 of file walsender.c.

1849{
1850 int elevel = got_STOPPING ? ERROR : WARNING;
1851 bool failover_slot;
1852
1854
1855 /*
1856 * Note that after receiving the shutdown signal, an ERROR is reported if
1857 * any slots are dropped, invalidated, or inactive. This measure is taken
1858 * to prevent the walsender from waiting indefinitely.
1859 */
1861 {
1863 return true;
1864 }
1865
1866 *wait_event = 0;
1867 return false;
1868}
#define WARNING
Definition elog.h:37
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3109

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1880 of file walsender.c.

1882{
1883 /* Check if we need to wait for WALs to be flushed to disk */
1884 if (target_lsn > flushed_lsn)
1885 {
1887 return true;
1888 }
1889
1890 /* Check if the standby slots have caught up to the flushed position */
1892}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1848

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 4215 of file walsender.c.

4216{
4218
4219 result->month = 0;
4220 result->day = 0;
4221 result->time = offset;
4222
4223 return result;
4224}
#define palloc_object(type)
Definition fe_memutils.h:89

References palloc_object, and result.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1172 of file walsender.c.

1176{
1177 ListCell *lc;
1178 bool snapshot_action_given = false;
1179 bool reserve_wal_given = false;
1180 bool two_phase_given = false;
1181 bool failover_given = false;
1182
1183 /* Parse options */
1184 foreach(lc, cmd->options)
1185 {
1186 DefElem *defel = (DefElem *) lfirst(lc);
1187
1188 if (strcmp(defel->defname, "snapshot") == 0)
1189 {
1190 char *action;
1191
1193 ereport(ERROR,
1195 errmsg("conflicting or redundant options")));
1196
1198 snapshot_action_given = true;
1199
1200 if (strcmp(action, "export") == 0)
1202 else if (strcmp(action, "nothing") == 0)
1204 else if (strcmp(action, "use") == 0)
1206 else
1207 ereport(ERROR,
1209 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1210 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1211 }
1212 else if (strcmp(defel->defname, "reserve_wal") == 0)
1213 {
1215 ereport(ERROR,
1217 errmsg("conflicting or redundant options")));
1218
1219 reserve_wal_given = true;
1221 }
1222 else if (strcmp(defel->defname, "two_phase") == 0)
1223 {
1225 ereport(ERROR,
1227 errmsg("conflicting or redundant options")));
1228 two_phase_given = true;
1230 }
1231 else if (strcmp(defel->defname, "failover") == 0)
1232 {
1234 ereport(ERROR,
1236 errmsg("conflicting or redundant options")));
1237 failover_given = true;
1239 }
1240 else
1241 elog(ERROR, "unrecognized option: %s", defel->defname);
1242 }
1243}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 4231 of file walsender.c.

4232{
4233#define PG_STAT_GET_WAL_SENDERS_COLS 12
4234 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4236 int num_standbys;
4237 int i;
4238
4239 InitMaterializedSRF(fcinfo, 0);
4240
4241 /*
4242 * Get the currently active synchronous standbys. This could be out of
4243 * date before we're done, but we'll use the data anyway.
4244 */
4246
4247 for (i = 0; i < max_wal_senders; i++)
4248 {
4252 XLogRecPtr flush;
4253 XLogRecPtr apply;
4254 TimeOffset writeLag;
4255 TimeOffset flushLag;
4256 TimeOffset applyLag;
4257 int priority;
4258 int pid;
4260 TimestampTz replyTime;
4261 bool is_sync_standby;
4263 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4264 int j;
4265
4266 /* Collect data from shared memory */
4267 SpinLockAcquire(&walsnd->mutex);
4268 if (walsnd->pid == 0)
4269 {
4270 SpinLockRelease(&walsnd->mutex);
4271 continue;
4272 }
4273 pid = walsnd->pid;
4274 sent_ptr = walsnd->sentPtr;
4275 state = walsnd->state;
4276 write = walsnd->write;
4277 flush = walsnd->flush;
4278 apply = walsnd->apply;
4279 writeLag = walsnd->writeLag;
4280 flushLag = walsnd->flushLag;
4281 applyLag = walsnd->applyLag;
4282 priority = walsnd->sync_standby_priority;
4283 replyTime = walsnd->replyTime;
4284 SpinLockRelease(&walsnd->mutex);
4285
4286 /*
4287 * Detect whether walsender is/was considered synchronous. We can
4288 * provide some protection against stale data by checking the PID
4289 * along with walsnd_index.
4290 */
4291 is_sync_standby = false;
4292 for (j = 0; j < num_standbys; j++)
4293 {
4294 if (sync_standbys[j].walsnd_index == i &&
4295 sync_standbys[j].pid == pid)
4296 {
4297 is_sync_standby = true;
4298 break;
4299 }
4300 }
4301
4302 values[0] = Int32GetDatum(pid);
4303
4305 {
4306 /*
4307 * Only superusers and roles with privileges of pg_read_all_stats
4308 * can see details. Other users only get the pid value to know
4309 * it's a walsender, but no details.
4310 */
4311 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4312 }
4313 else
4314 {
4316
4318 nulls[2] = true;
4320
4322 nulls[3] = true;
4323 values[3] = LSNGetDatum(write);
4324
4325 if (!XLogRecPtrIsValid(flush))
4326 nulls[4] = true;
4327 values[4] = LSNGetDatum(flush);
4328
4329 if (!XLogRecPtrIsValid(apply))
4330 nulls[5] = true;
4331 values[5] = LSNGetDatum(apply);
4332
4333 /*
4334 * Treat a standby such as a pg_basebackup background process
4335 * which always returns an invalid flush location, as an
4336 * asynchronous standby.
4337 */
4338 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4339
4340 if (writeLag < 0)
4341 nulls[6] = true;
4342 else
4344
4345 if (flushLag < 0)
4346 nulls[7] = true;
4347 else
4349
4350 if (applyLag < 0)
4351 nulls[8] = true;
4352 else
4354
4356
4357 /*
4358 * More easily understood version of standby state. This is purely
4359 * informational.
4360 *
4361 * In quorum-based sync replication, the role of each standby
4362 * listed in synchronous_standby_names can be changing very
4363 * frequently. Any standbys considered as "sync" at one moment can
4364 * be switched to "potential" ones at the next moment. So, it's
4365 * basically useless to report "sync" or "potential" as their sync
4366 * states. We report just "quorum" for them.
4367 */
4368 if (priority == 0)
4369 values[10] = CStringGetTextDatum("async");
4370 else if (is_sync_standby)
4372 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4373 else
4374 values[10] = CStringGetTextDatum("potential");
4375
4376 if (replyTime == 0)
4377 nulls[11] = true;
4378 else
4379 values[11] = TimestampTzGetDatum(replyTime);
4380 }
4381
4382 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4383 values, nulls);
4384 }
4385
4386 return (Datum) 0;
4387}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define MemSet(start, val, len)
Definition c.h:1163
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:470
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:764
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:4215
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:4196
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2494 of file walsender.c.

2495{
2496 bool changed = false;
2498
2500 SpinLockAcquire(&slot->mutex);
2501 if (slot->data.restart_lsn != lsn)
2502 {
2503 changed = true;
2504 slot->data.restart_lsn = lsn;
2505 }
2506 SpinLockRelease(&slot->mutex);
2507
2508 if (changed)
2509 {
2513 }
2514
2515 /*
2516 * One could argue that the slot should be saved to disk now, but that'd
2517 * be energy wasted - the worst thing lost information could cause here is
2518 * to give wrong information in a statistics view - we'll just potentially
2519 * be more conservative in removing files.
2520 */
2521}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1304
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1823

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2635 of file walsender.c.

2636{
2637 bool changed = false;
2639
2640 SpinLockAcquire(&slot->mutex);
2642
2643 /*
2644 * For physical replication we don't need the interlock provided by xmin
2645 * and effective_xmin since the consequences of a missed increase are
2646 * limited to query cancellations, so set both at once.
2647 */
2648 if (!TransactionIdIsNormal(slot->data.xmin) ||
2651 {
2652 changed = true;
2653 slot->data.xmin = feedbackXmin;
2655 }
2659 {
2660 changed = true;
2663 }
2664 SpinLockRelease(&slot->mutex);
2665
2666 if (changed)
2667 {
2670 }
2671}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1222
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:210
TransactionId effective_xmin
Definition slot.h:209
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1823 of file walsender.c.

1824{
1826
1827 /*
1828 * If we are running in a standby, there is no need to wake up walsenders.
1829 * This is because we do not support syncing slots to cascading standbys,
1830 * so, there are no walsenders waiting for standbys to catch up.
1831 */
1832 if (RecoveryInProgress())
1833 return;
1834
1837}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3076
#define SlotIsPhysical(slot)
Definition slot.h:287
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1702 of file walsender.c.

1703{
1704 for (;;)
1705 {
1706 long sleeptime;
1707
1708 /* Check for input from the client */
1710
1711 /* die if timeout was reached */
1713
1714 /*
1715 * During shutdown, die if the shutdown timeout expires. Call this
1716 * before WalSndComputeSleeptime() so the timeout is considered when
1717 * computing sleep time.
1718 */
1720
1721 /* Send keepalive if the time has come */
1723
1724 if (!pq_is_send_pending())
1725 break;
1726
1728
1729 /* Sleep until something happens or we time out */
1732
1733 /* Clear any already-pending wakeups */
1735
1737
1738 /* Process any requests or signals received recently */
1740
1741 /* Try to flush pending output to the client */
1742 if (pq_flush_if_writable() != 0)
1744 }
1745
1746 /* reactivate latch so WalSndLoop knows to continue */
1748}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:50
#define pq_is_send_pending()
Definition libpq.h:51
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:4055
static void WalSndCheckTimeOut(void)
Definition walsender.c:2964
static void ProcessRepliesIfAny(void)
Definition walsender.c:2343
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4425
static void WalSndCheckShutdownTimeout(void)
Definition walsender.c:2994
static void WalSndHandleConfigReload(void)
Definition walsender.c:1679
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:413
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2907

References CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), SetLatch(), WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2343 of file walsender.c.

2344{
2345 unsigned char firstchar;
2346 int maxmsglen;
2347 int r;
2348 bool received = false;
2349
2351
2352 /*
2353 * If we already received a CopyDone from the frontend, any subsequent
2354 * message is the beginning of a new command, and should be processed in
2355 * the main processing loop.
2356 */
2357 while (!streamingDoneReceiving)
2358 {
2361 if (r < 0)
2362 {
2363 /* unexpected error or EOF */
2366 errmsg("unexpected EOF on standby connection")));
2367 proc_exit(0);
2368 }
2369 if (r == 0)
2370 {
2371 /* no data available without blocking */
2372 pq_endmsgread();
2373 break;
2374 }
2375
2376 /* Validate message type and set packet size limit */
2377 switch (firstchar)
2378 {
2379 case PqMsg_CopyData:
2381 break;
2382 case PqMsg_CopyDone:
2383 case PqMsg_Terminate:
2385 break;
2386 default:
2387 ereport(FATAL,
2389 errmsg("invalid standby message type \"%c\"",
2390 firstchar)));
2391 maxmsglen = 0; /* keep compiler quiet */
2392 break;
2393 }
2394
2395 /* Read the message contents */
2398 {
2401 errmsg("unexpected EOF on standby connection")));
2402 proc_exit(0);
2403 }
2404
2405 /* ... and process it */
2406 switch (firstchar)
2407 {
2408 /*
2409 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2410 * packet.
2411 */
2412 case PqMsg_CopyData:
2414 received = true;
2415 break;
2416
2417 /*
2418 * PqMsg_CopyDone means the standby requested to finish
2419 * streaming. Reply with CopyDone, if we had not sent that
2420 * already.
2421 */
2422 case PqMsg_CopyDone:
2424 {
2426 streamingDoneSending = true;
2427 }
2428
2430 received = true;
2431 break;
2432
2433 /*
2434 * PqMsg_Terminate means that the standby is closing down the
2435 * socket.
2436 */
2437 case PqMsg_Terminate:
2438 proc_exit(0);
2439
2440 default:
2441 Assert(false); /* NOT REACHED */
2442 }
2443 }
2444
2445 /*
2446 * Save the last reply timestamp if we've received at least one reply.
2447 */
2448 if (received)
2449 {
2452 }
2453}
#define COMMERROR
Definition elog.h:34
#define FATAL
Definition elog.h:42
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:54
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1004
void pq_endmsgread(void)
Definition pqcomm.c:1166
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:207
static TimestampTz last_processing
Definition walsender.c:198
static bool streamingDoneSending
Definition walsender.c:225
static void ProcessStandbyMessage(void)
Definition walsender.c:2459
static bool streamingDoneReceiving
Definition walsender.c:226

References Assert, COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2715 of file walsender.c.

2716{
2721 TimestampTz replyTime;
2722
2723 /*
2724 * Decipher the reply message. The caller already consumed the msgtype
2725 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2726 * of this message.
2727 */
2728 replyTime = pq_getmsgint64(&reply_message);
2733
2735 {
2736 char *replyTimeStr;
2737
2738 /* Copy because timestamptz_to_str returns a static buffer */
2740
2741 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2746 replyTimeStr);
2747
2749 }
2750
2751 /*
2752 * Update shared state for this WalSender process based on reply data from
2753 * standby.
2754 */
2755 {
2757
2758 SpinLockAcquire(&walsnd->mutex);
2759 walsnd->replyTime = replyTime;
2760 SpinLockRelease(&walsnd->mutex);
2761 }
2762
2763 /*
2764 * Unset WalSender's xmins if the feedback message values are invalid.
2765 * This happens when the downstream turned hot_standby_feedback off.
2766 */
2769 {
2771 if (MyReplicationSlot != NULL)
2773 return;
2774 }
2775
2776 /*
2777 * Check that the provided xmin/epoch are sane, that is, not in the future
2778 * and not so far back as to be already wrapped around. Ignore if not.
2779 */
2782 return;
2783
2786 return;
2787
2788 /*
2789 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2790 * the xmin will be taken into account by GetSnapshotData() /
2791 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2792 * thereby prevent the generation of cleanup conflicts on the standby
2793 * server.
2794 *
2795 * There is a small window for a race condition here: although we just
2796 * checked that feedbackXmin precedes nextXid, the nextXid could have
2797 * gotten advanced between our fetching it and applying the xmin below,
2798 * perhaps far enough to make feedbackXmin wrap around. In that case the
2799 * xmin we set here would be "in the future" and have no effect. No point
2800 * in worrying about this since it's too late to save the desired data
2801 * anyway. Assuming that the standby sends us an increasing sequence of
2802 * xmins, this could only happen during the first reply cycle, else our
2803 * own xmin would prevent nextXid from advancing so far.
2804 *
2805 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2806 * is assumed atomic, and there's no real need to prevent concurrent
2807 * horizon determinations. (If we're moving our xmin forward, this is
2808 * obviously safe, and if we're moving it backwards, well, the data is at
2809 * risk already since a VACUUM could already have determined the horizon.)
2810 *
2811 * If we're using a replication slot we reserve the xmin via that,
2812 * otherwise via the walsender's PGPROC entry. We can only track the
2813 * catalog xmin separately when using a slot, so we store the least of the
2814 * two provided when not using a slot.
2815 *
2816 * XXX: It might make sense to generalize the ephemeral slot concept and
2817 * always use the slot mechanism to handle the feedback xmin.
2818 */
2819 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2821 else
2822 {
2826 else
2828 }
2829}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1870
uint32_t uint32
Definition c.h:680
uint32 TransactionId
Definition c.h:792
bool message_level_is_interesting(int elevel)
Definition elog.c:285
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2635
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2684

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2459 of file walsender.c.

2460{
2461 char msgtype;
2462
2463 /*
2464 * Check message type from the first byte.
2465 */
2467
2468 switch (msgtype)
2469 {
2472 break;
2473
2476 break;
2477
2480 break;
2481
2482 default:
2485 errmsg("unexpected message type \"%c\"", msgtype)));
2486 proc_exit(0);
2487 }
2488}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2715
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2835
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2527

References COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2835 of file walsender.c.

2836{
2843 TimestampTz replyTime;
2844
2845 /*
2846 * This shouldn't happen because we don't support getting primary status
2847 * message from standby.
2848 */
2849 if (RecoveryInProgress())
2850 elog(ERROR, "the primary status is unavailable during recovery");
2851
2852 replyTime = pq_getmsgint64(&reply_message);
2853
2854 /*
2855 * Update shared state for this WalSender process based on reply data from
2856 * standby.
2857 */
2858 SpinLockAcquire(&walsnd->mutex);
2859 walsnd->replyTime = replyTime;
2860 SpinLockRelease(&walsnd->mutex);
2861
2862 /*
2863 * Consider transactions in the current database, as only these are the
2864 * ones replicated.
2865 */
2868
2869 /*
2870 * Update the oldest xid for standby transmission if an older prepared
2871 * transaction exists and is currently in commit phase.
2872 */
2876
2880 lsn = GetXLogWriteRecPtr();
2881
2882 elog(DEBUG2, "sending primary status");
2883
2884 /* construct the message... */
2891
2892 /* ... and send it wrapped in CopyData */
2894}
int64_t int64
Definition c.h:677
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2832
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:441
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2835
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:10128

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2527 of file walsender.c.

2528{
2530 flushPtr,
2531 applyPtr;
2532 bool replyRequested;
2533 TimeOffset writeLag,
2534 flushLag,
2535 applyLag;
2536 bool clearLagTimes;
2538 TimestampTz replyTime;
2539
2543
2544 /* the caller already consumed the msgtype byte */
2548 replyTime = pq_getmsgint64(&reply_message);
2550
2552 {
2553 char *replyTimeStr;
2554
2555 /* Copy because timestamptz_to_str returns a static buffer */
2557
2558 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2562 replyRequested ? " (reply requested)" : "",
2563 replyTimeStr);
2564
2566 }
2567
2568 /* See if we can compute the round-trip lag for these positions. */
2573
2574 /*
2575 * If the standby reports that it has fully replayed the WAL, and the
2576 * write/flush/apply positions remain unchanged across two consecutive
2577 * reply messages, forget the lag times measured when it last
2578 * wrote/flushed/applied a WAL record.
2579 *
2580 * The second message with unchanged positions typically results from
2581 * wal_receiver_status_interval expiring on the standby, so lag values are
2582 * usually cleared after that interval when there is no activity. This
2583 * avoids displaying stale lag data until more WAL traffic arrives.
2584 */
2588
2592
2593 /* Send a reply if the standby requested one. */
2594 if (replyRequested)
2596
2597 /*
2598 * Update shared state for this WalSender process based on reply data from
2599 * standby.
2600 */
2601 {
2603
2604 SpinLockAcquire(&walsnd->mutex);
2605 walsnd->write = writePtr;
2606 walsnd->flush = flushPtr;
2607 walsnd->apply = applyPtr;
2608 if (writeLag != -1 || clearLagTimes)
2609 walsnd->writeLag = writeLag;
2610 if (flushLag != -1 || clearLagTimes)
2611 walsnd->flushLag = flushLag;
2612 if (applyLag != -1 || clearLagTimes)
2613 walsnd->applyLag = applyLag;
2614 walsnd->replyTime = replyTime;
2615 SpinLockRelease(&walsnd->mutex);
2616 }
2617
2620
2621 /*
2622 * Advance our local xmin horizon when the client confirmed a flush.
2623 */
2625 {
2628 else
2630 }
2631}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1813
#define SlotIsLogical(slot)
Definition slot.h:288
void SyncRepReleaseWaiters(void)
Definition syncrep.c:484
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:190
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2494
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4402
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4521

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 511 of file walsender.c.

512{
513#define READ_REPLICATION_SLOT_COLS 3
514 ReplicationSlot *slot;
517 TupleDesc tupdesc;
519 bool nulls[READ_REPLICATION_SLOT_COLS];
520
522 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
523 TEXTOID, -1, 0);
524 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
525 TEXTOID, -1, 0);
526 /* TimeLineID is unsigned, so int4 is not wide enough. */
527 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
528 INT8OID, -1, 0);
529 TupleDescFinalize(tupdesc);
530
531 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
532
534 slot = SearchNamedReplicationSlot(cmd->slotname, false);
535 if (slot == NULL || !slot->in_use)
536 {
538 }
539 else
540 {
542 int i = 0;
543
544 /* Copy slot contents while holding spinlock */
545 SpinLockAcquire(&slot->mutex);
546 slot_contents = *slot;
547 SpinLockRelease(&slot->mutex);
549
550 if (OidIsValid(slot_contents.data.database))
553 errmsg("cannot use %s with a logical replication slot",
554 "READ_REPLICATION_SLOT"));
555
556 /* slot type */
557 values[i] = CStringGetTextDatum("physical");
558 nulls[i] = false;
559 i++;
560
561 /* start LSN */
562 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
563 {
564 char xloc[64];
565
566 snprintf(xloc, sizeof(xloc), "%X/%08X",
567 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
569 nulls[i] = false;
570 }
571 i++;
572
573 /* timeline this WAL was produced on */
574 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
575 {
579
580 /*
581 * While in recovery, use as timeline the currently-replaying one
582 * to get the LSN position's history.
583 */
584 if (RecoveryInProgress())
586 else
588
593 nulls[i] = false;
594 }
595 i++;
596
598 }
599
602 do_tup_output(tstate, values, nulls);
604}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:77
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:545
#define OidIsValid(objectId)
Definition c.h:914
@ LW_SHARED
Definition lwlock.h:105
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg, ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), tliOfPointInHistory(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 611 of file walsender.c.

612{
614 TupleDesc tupdesc;
617 char path[MAXPGPATH];
618 int fd;
621 Size len;
622
624
625 /*
626 * Reply with a result set with one row, and two columns. The first col is
627 * the name of the history file, 2nd is the contents.
628 */
629 tupdesc = CreateTemplateTupleDesc(2);
630 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
631 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
632 TupleDescFinalize(tupdesc);
633
635 TLHistoryFilePath(path, cmd->timeline);
636
637 /* Send a RowDescription message */
638 dest->rStartup(dest, CMD_SELECT, tupdesc);
639
640 /* Send a DataRow message */
642 pq_sendint16(&buf, 2); /* # of columns */
644 pq_sendint32(&buf, len); /* col1 len */
646
648 if (fd < 0)
651 errmsg("could not open file \"%s\": %m", path)));
652
653 /* Determine file length and send it to client */
655 if (histfilelen < 0)
658 errmsg("could not seek to end of file \"%s\": %m", path)));
659 if (lseek(fd, 0, SEEK_SET) != 0)
662 errmsg("could not seek to beginning of file \"%s\": %m", path)));
663
664 pq_sendint32(&buf, histfilelen); /* col2 len */
665
667 while (bytesleft > 0)
668 {
670 int nread;
671
673 nread = read(fd, rbuf.data, sizeof(rbuf));
675 if (nread < 0)
678 errmsg("could not read file \"%s\": %m",
679 path)));
680 else if (nread == 0)
683 errmsg("could not read file \"%s\": read %d of %zu",
684 path, nread, (Size) bytesleft)));
685
686 pq_sendbytes(&buf, rbuf.data, nread);
687 bytesleft -= nread;
688 }
689
690 if (CloseTransientFile(fd) != 0)
693 errmsg("could not close file \"%s\": %m", path)));
694
696}
#define PG_BINARY
Definition c.h:1442
size_t Size
Definition c.h:745
int errcode_for_file_access(void)
Definition elog.c:898
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:273
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), TupleDescFinalize(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1514 of file walsender.c.

1515{
1517 QueryCompletion qc;
1518
1519 /* make sure that our requirements are still fulfilled */
1521
1523
1524 ReplicationSlotAcquire(cmd->slotname, true, true);
1525
1526 /*
1527 * Force a disconnect, so that the decoding code doesn't need to care
1528 * about an eventual switch from running in recovery, to running in a
1529 * normal environment. Client code is expected to handle reconnects.
1530 */
1532 {
1533 ereport(LOG,
1534 (errmsg("terminating walsender process after promotion")));
1535 got_STOPPING = true;
1536 }
1537
1538 /*
1539 * Create our decoding context, making it start at the previously ack'ed
1540 * position.
1541 *
1542 * Do this before sending a CopyBothResponse message, so that any errors
1543 * are reported early.
1544 */
1546 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1548 .segment_open = WalSndSegmentOpen,
1549 .segment_close = wal_segment_close),
1553
1555
1556 /* Send a CopyBothResponse message, and start streaming */
1558 pq_sendbyte(&buf, 0);
1559 pq_sendint16(&buf, 0);
1561 pq_flush();
1562
1563 /* Start reading WAL from the oldest required WAL. */
1566
1567 /*
1568 * Report the location after which we'll send out further commits as the
1569 * current sentPtr.
1570 */
1572
1573 /* Also update the sent position status in shared memory */
1577
1578 replication_active = true;
1579
1581
1582 /* Main loop of walsender */
1584
1587
1588 replication_active = false;
1589 if (got_STOPPING)
1590 proc_exit(0);
1592
1593 /* Get out of COPY mode (CommandComplete). */
1595 EndCommand(&qc, DestRemote, false);
1596}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:205
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:49
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:491
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
void SyncRepInitConfig(void)
Definition syncrep.c:455
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:3030
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:243
static void XLogSendLogical(void)
Definition walsender.c:3654
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg, fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 844 of file walsender.c.

845{
849
850 /* create xlogreader for physical replication */
851 xlogreader =
853 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
854 .segment_close = wal_segment_close),
855 NULL);
856
857 if (!xlogreader)
860 errmsg("out of memory"),
861 errdetail("Failed while allocating a WAL reading processor.")));
862
863 /*
864 * We assume here that we're logging enough information in the WAL for
865 * log-shipping, since this is checked in PostmasterMain().
866 *
867 * NOTE: wal_level can only change at shutdown, so in most cases it is
868 * difficult for there to be WAL data that we can still see that was
869 * written at wal_level='minimal'.
870 */
871
872 if (cmd->slotname)
873 {
874 ReplicationSlotAcquire(cmd->slotname, true, true);
878 errmsg("cannot use a logical replication slot for physical replication")));
879
880 /*
881 * We don't need to verify the slot's restart_lsn here; instead we
882 * rely on the caller requesting the starting point to use. If the
883 * WAL segment doesn't exist, we'll fail later.
884 */
885 }
886
887 /*
888 * Select the timeline. If it was given explicitly by the client, use
889 * that. Otherwise use the timeline of the last replayed record.
890 */
894 else
896
897 if (cmd->timeline != 0)
898 {
900
901 sendTimeLine = cmd->timeline;
902 if (sendTimeLine == FlushTLI)
903 {
906 }
907 else
908 {
910
912
913 /*
914 * Check that the timeline the client requested exists, and the
915 * requested start location is on that timeline.
916 */
921
922 /*
923 * Found the requested timeline in the history. Check that
924 * requested startpoint is on that timeline in our history.
925 *
926 * This is quite loose on purpose. We only check that we didn't
927 * fork off the requested timeline before the switchpoint. We
928 * don't check that we switched *to* it before the requested
929 * starting point. This is because the client can legitimately
930 * request to start replication from the beginning of the WAL
931 * segment that contains switchpoint, but on the new timeline, so
932 * that it doesn't end up with a partial segment. If you ask for
933 * too old a starting point, you'll get an error later when we
934 * fail to find the requested WAL segment in pg_wal.
935 *
936 * XXX: we could be more strict here and only allow a startpoint
937 * that's older than the switchpoint, if it's still in the same
938 * WAL segment.
939 */
941 switchpoint < cmd->startpoint)
942 {
944 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
946 cmd->timeline),
947 errdetail("This server's history forked from timeline %u at %X/%08X.",
948 cmd->timeline,
950 }
952 }
953 }
954 else
955 {
959 }
960
962
963 /* If there is nothing to stream, don't even enter COPY mode */
965 {
966 /*
967 * When we first start replication the standby will be behind the
968 * primary. For some applications, for example synchronous
969 * replication, it is important to have a clear state for this initial
970 * catchup mode, so we can trigger actions when we change streaming
971 * state later. We may stay in this state for a long time, which is
972 * exactly why we want to be able to monitor whether or not we are
973 * still here.
974 */
976
977 /* Send a CopyBothResponse message, and start streaming */
979 pq_sendbyte(&buf, 0);
980 pq_sendint16(&buf, 0);
982 pq_flush();
983
984 /*
985 * Don't allow a request to stream from a future point in WAL that
986 * hasn't been flushed to disk in this server yet.
987 */
988 if (FlushPtr < cmd->startpoint)
989 {
991 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
994 }
995
996 /* Start streaming from the requested point */
997 sentPtr = cmd->startpoint;
998
999 /* Initialize shared memory status, too */
1003
1005
1006 /* Main loop of walsender */
1007 replication_active = true;
1008
1010
1011 replication_active = false;
1012 if (got_STOPPING)
1013 proc_exit(0);
1015
1017 }
1018
1019 if (cmd->slotname)
1021
1022 /*
1023 * Copy is finished now. Send a single-row result set indicating the next
1024 * timeline.
1025 */
1027 {
1028 char startpos_str[8 + 1 + 8 + 1];
1031 TupleDesc tupdesc;
1032 Datum values[2];
1033 bool nulls[2] = {0};
1034
1035 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1037
1039
1040 /*
1041 * Need a tuple descriptor representing two columns. int8 may seem
1042 * like a surprising data type for this, but in theory int4 would not
1043 * be wide enough for this, as TimeLineID is unsigned.
1044 */
1045 tupdesc = CreateTemplateTupleDesc(2);
1046 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1047 INT8OID, -1, 0);
1048 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1049 TEXTOID, -1, 0);
1050 TupleDescFinalize(tupdesc);
1051
1052 /* prepare for projection of tuple */
1054
1057
1058 /* send it to dest */
1059 do_tup_output(tstate, values, nulls);
1060
1062 }
1063
1064 /* Send CommandComplete message */
1065 EndReplicationCommand("START_STREAMING");
1066}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:573
int errdetail(const char *fmt,...) pg_attribute_printf(1
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3344
int wal_segment_size
Definition xlog.c:150
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:108

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2684 of file walsender.c.

2685{
2687 TransactionId nextXid;
2689
2693
2694 if (xid <= nextXid)
2695 {
2696 if (epoch != nextEpoch)
2697 return false;
2698 }
2699 else
2700 {
2701 if (epoch + 1 != nextEpoch)
2702 return false;
2703 }
2704
2705 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2706 return false; /* epoch OK, but it's wrapped around */
2707
2708 return true;
2709}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 702 of file walsender.c.

703{
704 MemoryContext mcxt;
706 off_t offset = 0;
708
709 /*
710 * parsing the manifest will use the cryptohash stuff, which requires a
711 * resource owner
712 */
717
718 /* Prepare to read manifest data into a temporary context. */
720 "incremental backup information",
723
724 /* Send a CopyInResponse message */
726 pq_sendbyte(&buf, 0);
727 pq_sendint16(&buf, 0);
729 pq_flush();
730
731 /* Receive packets from client until done. */
732 while (HandleUploadManifestPacket(&buf, &offset, ib))
733 ;
734
735 /* Finish up manifest processing. */
737
738 /*
739 * Discard any old manifest information and arrange to preserve the new
740 * information we just got.
741 *
742 * We assume that MemoryContextDelete and MemoryContextSetParent won't
743 * fail, and thus we shouldn't end up bailing out of here in such a way as
744 * to leave dangling pointers.
745 */
751
752 /* clean up the resource owner we created */
754}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:689
MemoryContext CacheMemoryContext
Definition mcxt.c:170
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1026
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:768
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:173

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckShutdownTimeout()

static void WalSndCheckShutdownTimeout ( void  )
static

Definition at line 2994 of file walsender.c.

2995{
2997
2998 /* Do nothing if shutdown has not been requested yet */
2999 if (!(got_STOPPING || got_SIGUSR2))
3000 return;
3001
3002 /* Terminate immediately if the timeout is set to 0 */
3005
3006 /*
3007 * Record the shutdown request timestamp even if
3008 * wal_sender_shutdown_timeout is disabled (-1), since the setting may
3009 * change during shutdown and the timestamp will be needed in that case.
3010 */
3012 {
3014 return;
3015 }
3016
3017 /* Do not check the timeout if it's disabled */
3019 return;
3020
3021 /* Terminate immediately if the timeout expires */
3026}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1789
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:232
int wal_sender_shutdown_timeout
Definition walsender.c:146
static TimestampTz shutdown_request_timestamp
Definition walsender.c:210
static pg_noreturn void WalSndDoneImmediate(void)
Definition walsender.c:3741

References GetCurrentTimestamp(), got_SIGUSR2, got_STOPPING, now(), shutdown_request_timestamp, TimestampDifferenceExceeds(), wal_sender_shutdown_timeout, and WalSndDoneImmediate().

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2964 of file walsender.c.

2965{
2967
2968 /* don't bail out if we're doing something that doesn't require timeouts */
2969 if (last_reply_timestamp <= 0)
2970 return;
2971
2974
2976 {
2977 /*
2978 * Since typically expiration of replication timeout means
2979 * communication problem, we don't send the error message to the
2980 * standby.
2981 */
2983 (errmsg("terminating walsender process due to replication timeout")));
2984
2986 }
2987}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:143

References COMMERROR, ereport, errmsg, fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2907 of file walsender.c.

2908{
2910 long sleeptime = 10000; /* 10 s */
2911
2913 {
2914 /*
2915 * At the latest stop sleeping once wal_sender_timeout has been
2916 * reached.
2917 */
2920
2921 /*
2922 * If no ping has been sent yet, wakeup when it's time to do so.
2923 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2924 * the timeout passed without a response.
2925 */
2928 wal_sender_timeout / 2);
2929
2930 /* Compute relative time until wakeup. */
2932 }
2933
2935 {
2936 long shutdown_sleeptime;
2937
2940
2942
2943 /* Choose the earliest wakeup. */
2946 }
2947
2948 return sleeptime;
2949}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765

References fb(), last_reply_timestamp, now(), shutdown_request_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_shutdown_timeout, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3792 of file walsender.c.

3793{
3795
3796 /* ... let's just be real sure we're caught up ... */
3797 send_data();
3798
3799 /*
3800 * To figure out whether all WAL has successfully been replicated, check
3801 * flush location if valid, write otherwise. Tools like pg_receivewal will
3802 * usually (unless in synchronous mode) return an invalid flush location.
3803 */
3806
3809 {
3810 QueryCompletion qc;
3811
3813
3814 /* Inform the standby that XLOG streaming is done */
3816 EndCommandExtended(&qc, DestRemote, false, true);
3818
3819 /*
3820 * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime()
3821 * calls ignore wal_sender_timeout during shutdown.
3822 */
3824
3825 /*
3826 * Do not call pq_flush() here, since it can block indefinitely while
3827 * waiting for the socket to become writable, preventing
3828 * wal_sender_shutdown_timeout from being enforced. Instead, use the
3829 * walsender nonblocking flush path so the shutdown timeout continues
3830 * to be checked while the send buffer drains.
3831 */
3832 for (;;)
3833 {
3834 long sleeptime;
3835
3836 /*
3837 * During shutdown, die if the shutdown timeout expires. Call this
3838 * before WalSndComputeSleeptime() so the timeout is considered
3839 * when computing sleep time.
3840 */
3842
3843 if (!pq_is_send_pending())
3844 break;
3845
3847
3848 /* Sleep until something happens or we time out */
3851
3852 /* Clear any already-pending wakeups */
3854
3856
3857 /* Try to flush pending output to the client */
3858 if (pq_flush_if_writable() != 0)
3860 }
3861
3862 proc_exit(0);
3863 }
3866}
void EndCommandExtended(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output, bool noblock)
Definition dest.c:170
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:229
static bool shutdown_stream_done_queued
Definition walsender.c:217

References Assert, CHECK_FOR_INTERRUPTS, DestRemote, EndCommandExtended(), fb(), WalSnd::flush, GetCurrentTimestamp(), InvalidXLogRecPtr, last_reply_timestamp, MyLatch, MyWalSnd, pq_flush_if_writable, pq_is_send_pending, proc_exit(), ResetLatch(), sentPtr, SetQueryCompletion(), shutdown_stream_done_queued, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndShutdown(), WalSndWait(), WL_SOCKET_WRITEABLE, WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndDoneImmediate()

static void WalSndDoneImmediate ( void  )
static

Definition at line 3741 of file walsender.c.

3742{
3744
3745 if ((state == WALSNDSTATE_CATCHUP ||
3749 {
3750 QueryCompletion qc;
3751
3752 /* Try to inform receiver that XLOG streaming is done */
3754 EndCommandExtended(&qc, DestRemote, false, true);
3756
3757 /*
3758 * Note that the output buffer may be full during the forced shutdown
3759 * of walsender. If pq_flush() is called at that time, the walsender
3760 * process will be stuck. Therefore, call pq_flush_if_writable()
3761 * instead. Successful reception of the done message with the
3762 * walsender forced into a shutdown is not guaranteed.
3763 */
3765 }
3766
3767 /*
3768 * Prevent ereport from attempting to send any more messages to the
3769 * standby. Otherwise, it can cause the process to get stuck if the output
3770 * buffers are full.
3771 */
3774
3776 (errmsg("terminating walsender process due to replication shutdown timeout"),
3777 errdetail("Walsender process might have been terminated before all WAL data was replicated to the receiver.")));
3778
3779 proc_exit(0);
3780}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:97
@ WALSNDSTATE_STREAMING

References DestNone, DestRemote, EndCommandExtended(), ereport, errdetail(), errmsg, fb(), MyWalSnd, pq_flush_if_writable, proc_exit(), SetQueryCompletion(), shutdown_stream_done_queued, WalSnd::state, WALSNDSTATE_CATCHUP, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, WARNING, and whereToSendOutput.

Referenced by WalSndCheckShutdownTimeout().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 377 of file walsender.c.

378{
383
384 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
386
387 if (MyReplicationSlot != NULL)
389
391
392 replication_active = false;
393
394 /*
395 * If there is a transaction in progress, it will clean up our
396 * ResourceOwner, but if a replication command set up a resource owner
397 * without a transaction, we've got to clean that up now.
398 */
401
403 proc_exit(0);
404
405 /* Revert back to startup state */
407}
void pgaio_error_cleanup(void)
Definition aio.c:1175
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1866
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:861
WALOpenSegment seg
Definition xlogreader.h:271
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5043

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 4196 of file walsender.c.

4197{
4198 switch (state)
4199 {
4201 return "startup";
4202 case WALSNDSTATE_BACKUP:
4203 return "backup";
4205 return "catchup";
4207 return "streaming";
4209 return "stopping";
4210 }
4211 return "UNKNOWN";
4212}
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndHandleConfigReload()

static void WalSndHandleConfigReload ( void  )
static

Definition at line 1679 of file walsender.c.

1680{
1682 return;
1683
1684 ConfigReloadPending = false;
1687
1688 /*
1689 * Recheck and release any now-satisfied waiters after config reload
1690 * changes synchronous replication requirements (e.g., reducing the number
1691 * of sync standbys or changing the standby names).
1692 */
1695}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27

References am_cascading_walsender, ConfigReloadPending, PGC_SIGHUP, ProcessConfigFile(), SyncRepInitConfig(), and SyncRepReleaseWaiters().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 4113 of file walsender.c.

4114{
4115 int i;
4116
4117 for (i = 0; i < max_wal_senders; i++)
4118 {
4120 pid_t pid;
4121
4122 SpinLockAcquire(&walsnd->mutex);
4123 pid = walsnd->pid;
4124 SpinLockRelease(&walsnd->mutex);
4125
4126 if (pid == 0)
4127 continue;
4128
4130 }
4131}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:296
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4425 of file walsender.c.

4426{
4428
4429 /*
4430 * Don't send keepalive messages if timeouts are globally disabled or
4431 * we're doing something not partaking in timeouts.
4432 */
4434 return;
4435
4437 return;
4438
4439 /*
4440 * If half of wal_sender_timeout has lapsed without receiving any reply
4441 * from the standby, send a keep-alive message to the standby requesting
4442 * an immediate reply.
4443 */
4445 wal_sender_timeout / 2);
4447 {
4449
4450 /* Try to flush pending output to the client */
4451 if (pq_flush_if_writable() != 0)
4453 }
4454}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3250 of file walsender.c.

3251{
3253
3254 Assert(walsnd != NULL);
3255
3256 MyWalSnd = NULL;
3257
3258 SpinLockAcquire(&walsnd->mutex);
3259 /* Mark WalSnd struct as no longer being in use. */
3260 walsnd->pid = 0;
3261 SpinLockRelease(&walsnd->mutex);
3262}

References Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3960 of file walsender.c.

3961{
3962 got_SIGUSR2 = true;
3964}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 3030 of file walsender.c.

3031{
3033
3034 /*
3035 * Initialize the last reply timestamp. That enables timeout processing
3036 * from hereon.
3037 */
3040
3041 /*
3042 * Loop until we reach the end of this timeline or the client requests to
3043 * stop streaming.
3044 */
3045 for (;;)
3046 {
3047 /* Clear any already-pending wakeups */
3049
3051
3052 /* Process any requests or signals received recently */
3054
3055 /* Check for input from the client */
3057
3058 /*
3059 * If we have received CopyDone from the client, sent CopyDone
3060 * ourselves, and the output buffer is empty, it's time to exit
3061 * streaming.
3062 */
3065 break;
3066
3067 /*
3068 * If we don't have any pending data in the output buffer, try to send
3069 * some more. If there is some, we don't bother to call send_data
3070 * again until we've flushed it ... but we'd better assume we are not
3071 * caught up.
3072 */
3073 if (!pq_is_send_pending())
3074 send_data();
3075 else
3076 WalSndCaughtUp = false;
3077
3078 /* Try to flush pending output to the client */
3079 if (pq_flush_if_writable() != 0)
3081
3082 /* If nothing remains to be sent right now ... */
3084 {
3085 /*
3086 * If we're in catchup state, move to streaming. This is an
3087 * important state change for users to know about, since before
3088 * this point data loss might occur if the primary dies and we
3089 * need to failover to the standby. The state change is also
3090 * important for synchronous replication, since commits that
3091 * started to wait at that point might wait for some time.
3092 */
3094 {
3096 (errmsg_internal("\"%s\" has now caught up with upstream server",
3099 }
3100
3101 /*
3102 * When SIGUSR2 arrives, we send any outstanding logs up to the
3103 * shutdown checkpoint record (i.e., the latest record), wait for
3104 * them to be replicated to the standby, and exit. This may be a
3105 * normal termination at shutdown, or a promotion, the walsender
3106 * is not sure which.
3107 */
3108 if (got_SIGUSR2)
3110 }
3111
3112 /* Check for replication timeout. */
3114
3115 /*
3116 * During shutdown, die if the shutdown timeout expires. Call this
3117 * before WalSndComputeSleeptime() so the timeout is considered when
3118 * computing sleep time.
3119 */
3121
3122 /* Send keepalive if the time has come */
3124
3125 /*
3126 * Block if we have unsent data. XXX For logical replication, let
3127 * WalSndWaitForWal() handle any other blocking; idle receivers need
3128 * its additional actions. For physical replication, also block if
3129 * caught up; its send_data does not block.
3130 *
3131 * The IO statistics are reported in WalSndWaitForWal() for the
3132 * logical WAL senders.
3133 */
3137 {
3138 long sleeptime;
3139 int wakeEvents;
3141
3144 else
3145 wakeEvents = 0;
3146
3147 /*
3148 * Use fresh timestamp, not last_processing, to reduce the chance
3149 * of reaching wal_sender_timeout before sending a keepalive.
3150 */
3153
3154 if (pq_is_send_pending())
3156
3157 /* Report IO statistics, if needed */
3160 {
3161 pgstat_flush_io(false);
3163 last_flush = now;
3164 }
3165
3166 /* Sleep until something happens or we time out */
3168 }
3169 }
3170}
char * application_name
Definition guc_tables.c:589
bool pgstat_flush_backend(bool nowait, uint32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:107
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3792

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1607 of file walsender.c.

1608{
1609 /* can't have sync rep confused by sending the same LSN several times */
1610 if (!last_write)
1611 lsn = InvalidXLogRecPtr;
1612
1613 resetStringInfo(ctx->out);
1614
1616 pq_sendint64(ctx->out, lsn); /* dataStart */
1617 pq_sendint64(ctx->out, lsn); /* walEnd */
1618
1619 /*
1620 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1621 * reserve space here.
1622 */
1623 pq_sendint64(ctx->out, 0); /* sendtime */
1624}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3913 of file walsender.c.

3914{
3915 int i;
3916
3917 for (i = 0; i < max_wal_senders; i++)
3918 {
3920
3921 SpinLockAcquire(&walsnd->mutex);
3922 if (walsnd->pid == 0)
3923 {
3924 SpinLockRelease(&walsnd->mutex);
3925 continue;
3926 }
3927 walsnd->needreload = true;
3928 SpinLockRelease(&walsnd->mutex);
3929 }
3930}

References fb(), i, max_wal_senders, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3266 of file walsender.c.

3268{
3269 char path[MAXPGPATH];
3270
3271 /*-------
3272 * When reading from a historic timeline, and there is a timeline switch
3273 * within this segment, read from the WAL segment belonging to the new
3274 * timeline.
3275 *
3276 * For example, imagine that this server is currently on timeline 5, and
3277 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3278 * 0/13002088. In pg_wal, we have these files:
3279 *
3280 * ...
3281 * 000000040000000000000012
3282 * 000000040000000000000013
3283 * 000000050000000000000013
3284 * 000000050000000000000014
3285 * ...
3286 *
3287 * In this situation, when requested to send the WAL from segment 0x13, on
3288 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3289 * recovery prefers files from newer timelines, so if the segment was
3290 * restored from the archive on this server, the file belonging to the old
3291 * timeline, 000000040000000000000013, might not exist. Their contents are
3292 * equal up to the switchpoint, because at a timeline switch, the used
3293 * portion of the old segment is copied to the new file.
3294 */
3297 {
3299
3300 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3301 if (nextSegNo == endSegNo)
3303 }
3304
3305 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3306 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3307 if (state->seg.ws_file >= 0)
3308 return;
3309
3310 /*
3311 * If the file is not found, assume it's because the standby asked for a
3312 * too old WAL segment that has already been removed or recycled.
3313 */
3314 if (errno == ENOENT)
3315 {
3316 char xlogfname[MAXFNAMELEN];
3317 int save_errno = errno;
3318
3320 errno = save_errno;
3321 ereport(ERROR,
3323 errmsg("requested WAL segment %s has already been removed",
3324 xlogfname)));
3325 }
3326 else
3327 ereport(ERROR,
3329 errmsg("could not open file \"%s\": %m",
3330 path)));
3331}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1090
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 4177 of file walsender.c.

4178{
4180
4182
4183 if (walsnd->state == state)
4184 return;
4185
4186 SpinLockAcquire(&walsnd->mutex);
4187 walsnd->state = state;
4188 SpinLockRelease(&walsnd->mutex);
4189}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

static void WalSndShmemInit ( void arg)
static

Definition at line 4001 of file walsender.c.

4002{
4003 for (int i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
4005
4006 for (int i = 0; i < max_wal_senders; i++)
4007 {
4009
4010 SpinLockInit(&walsnd->mutex);
4011 }
4012
4016}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, NUM_SYNC_REP_WAIT_MODE, SpinLockInit(), WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WalSndCtlData::walsnds.

◆ WalSndShmemRequest()

static void WalSndShmemRequest ( void arg)
static

Definition at line 3987 of file walsender.c.

3988{
3989 Size size;
3990
3991 size = offsetof(WalSndCtlData, walsnds);
3992 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3993 ShmemRequestStruct(.name = "Wal Sender Ctl",
3994 .size = size,
3995 .ptr = (void **) &WalSndCtl,
3996 );
3997}
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References add_size(), fb(), max_wal_senders, mul_size(), name, ShmemRequestStruct, and WalSndCtl.

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 413 of file walsender.c.

414{
415 /*
416 * Reset whereToSendOutput to prevent ereport from attempting to send any
417 * more messages to the standby.
418 */
421
422 proc_exit(0);
423}

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3968 of file walsender.c.

3969{
3970 /* Set up signal handlers */
3972 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3973 pqsignal(SIGTERM, die); /* request shutdown */
3974 /* SIGQUIT handler was already set up by InitPostmasterChild */
3975 InitializeTimeouts(); /* establishes SIGALRM handler */
3978 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3979 * shutdown */
3980
3981 /* Reset some signals that are accepted by postmaster but not here */
3983}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:548
#define PG_SIG_IGN
Definition port.h:552
#define PG_SIG_DFL
Definition port.h:551
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3065
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:696
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3960
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), PG_SIG_DFL, PG_SIG_IGN, pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1758 of file walsender.c.

1760{
1761 static TimestampTz sendTime = 0;
1763 bool pending_writes = false;
1764 bool end_xact = ctx->end_xact;
1765
1766 /*
1767 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1768 * avoid flooding the lag tracker when we commit frequently.
1769 *
1770 * We don't have a mechanism to get the ack for any LSN other than end
1771 * xact LSN from the downstream. So, we track lag only for end of
1772 * transaction LSN.
1773 */
1774#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1775 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1777 {
1778 LagTrackerWrite(lsn, now);
1779 sendTime = now;
1780 }
1781
1782 /*
1783 * When skipping empty transactions in synchronous replication, we send a
1784 * keepalive message to avoid delaying such transactions.
1785 *
1786 * It is okay to check sync_standbys_status without lock here as in the
1787 * worst case we will just send an extra keepalive message when it is
1788 * really not required.
1789 */
1790 if (skipped_xact &&
1791 SyncRepRequested() &&
1792 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1793 {
1794 WalSndKeepalive(false, lsn);
1795
1796 /* Try to flush pending output to the client */
1797 if (pq_flush_if_writable() != 0)
1799
1800 /* If we have pending write here, make sure it's actually flushed */
1801 if (pq_is_send_pending())
1802 pending_writes = true;
1803 }
1804
1805 /*
1806 * Process pending writes if any or try to send a keepalive if required.
1807 * We don't need to try sending keep alive messages at the transaction end
1808 * as that will be done at a later point in time. This is required only
1809 * for large transactions where we don't send any changes to the
1810 * downstream and the receiver can timeout due to that.
1811 */
1812 if (pending_writes || (!end_xact &&
1814 wal_sender_timeout / 2)))
1816}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1702
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4463
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 4055 of file walsender.c.

4056{
4057 WaitEvent event;
4058
4060
4061 /*
4062 * We use a condition variable to efficiently wake up walsenders in
4063 * WalSndWakeup().
4064 *
4065 * Every walsender prepares to sleep on a shared memory CV. Note that it
4066 * just prepares to sleep on the CV (i.e., adds itself to the CV's
4067 * waitlist), but does not actually wait on the CV (IOW, it never calls
4068 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
4069 * waiting, because we also need to wait for socket events. The processes
4070 * (startup process, walreceiver etc.) wanting to wake up walsenders use
4071 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
4072 * walsenders come out of WaitEventSetWait().
4073 *
4074 * This approach is simple and efficient because, one doesn't have to loop
4075 * through all the walsenders slots, with a spinlock acquisition and
4076 * release for every iteration, just to wake up only the waiting
4077 * walsenders. It makes WalSndWakeup() callers' life easy.
4078 *
4079 * XXX: A desirable future improvement would be to add support for CVs
4080 * into WaitEventSetWait().
4081 *
4082 * And, we use separate shared memory CVs for physical and logical
4083 * walsenders for selective wake ups, see WalSndWakeup() for more details.
4084 *
4085 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
4086 * until awakened by physical walsenders after the walreceiver confirms
4087 * the receipt of the LSN.
4088 */
4095
4096 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
4097 (event.events & WL_POSTMASTER_DEATH))
4098 {
4100 proc_exit(1);
4101 }
4102
4104}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:66
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:167
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1908 of file walsender.c.

1909{
1910 int wakeEvents;
1911 uint32 wait_event = 0;
1914
1915 /*
1916 * Fast path to avoid acquiring the spinlock in case we already know we
1917 * have enough WAL available and all the standby servers have confirmed
1918 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1919 * if we're far behind.
1920 */
1923 return RecentFlushPtr;
1924
1925 /*
1926 * Within the loop, we wait for the necessary WALs to be flushed to disk
1927 * first, followed by waiting for standbys to catch up if there are enough
1928 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1929 */
1930 for (;;)
1931 {
1932 bool wait_for_standby_at_stop = false;
1933 long sleeptime;
1935
1936 /* Clear any already-pending wakeups */
1938
1940
1941 /* Process any requests or signals received recently */
1943
1944 /* Check for input from the client */
1946
1947 /*
1948 * If we're shutting down, trigger pending WAL to be written out,
1949 * otherwise we'd possibly end up waiting for WAL that never gets
1950 * written, because walwriter has shut down already.
1951 *
1952 * Note that GetXLogInsertEndRecPtr() is used to obtain the WAL flush
1953 * request location instead of GetXLogInsertRecPtr(). Because if the
1954 * last WAL record ends at a page boundary, GetXLogInsertRecPtr() can
1955 * return an LSN pointing past the page header, which may cause
1956 * XLogFlush() to report an error.
1957 */
1960
1961 /*
1962 * To avoid the scenario where standbys need to catch up to a newer
1963 * WAL location in each iteration, we update our idea of the currently
1964 * flushed position only if we are not waiting for standbys to catch
1965 * up.
1966 */
1968 {
1969 if (!RecoveryInProgress())
1971 else
1973 }
1974
1975 /*
1976 * If postmaster asked us to stop and the standby slots have caught up
1977 * to the flushed position, don't wait anymore.
1978 *
1979 * It's important to do this check after the recomputation of
1980 * RecentFlushPtr, so we can send all remaining data before shutting
1981 * down.
1982 */
1983 if (got_STOPPING)
1984 {
1987 else
1988 break;
1989 }
1990
1991 /*
1992 * We only send regular messages to the client for full decoded
1993 * transactions, but a synchronous replication and walsender shutdown
1994 * possibly are waiting for a later location. So, before sleeping, we
1995 * send a ping containing the flush location. If the receiver is
1996 * otherwise idle, this keepalive will trigger a reply. Processing the
1997 * reply will update these MyWalSnd locations.
1998 */
1999 if (MyWalSnd->flush < sentPtr &&
2000 MyWalSnd->write < sentPtr &&
2003
2004 /*
2005 * Exit the loop if already caught up and doesn't need to wait for
2006 * standby slots.
2007 */
2010 break;
2011
2012 /*
2013 * Waiting for new WAL or waiting for standbys to catch up. Since we
2014 * need to wait, we're now caught up.
2015 */
2016 WalSndCaughtUp = true;
2017
2018 /*
2019 * Try to flush any pending output to the client.
2020 */
2021 if (pq_flush_if_writable() != 0)
2023
2024 /*
2025 * If we have received CopyDone from the client, sent CopyDone
2026 * ourselves, and the output buffer is empty, it's time to exit
2027 * streaming, so fail the current WAL fetch request.
2028 */
2031 break;
2032
2033 /* die if timeout was reached */
2035
2036 /*
2037 * During shutdown, die if the shutdown timeout expires. Call this
2038 * before WalSndComputeSleeptime() so the timeout is considered when
2039 * computing sleep time.
2040 */
2042
2043 /* Send keepalive if the time has come */
2045
2046 /*
2047 * Sleep until something happens or we time out. Also wait for the
2048 * socket becoming writable, if there's still pending output.
2049 * Otherwise we might sit on sendable output data while waiting for
2050 * new WAL to be generated. (But if we have nothing to send, we don't
2051 * want to wake on socket-writable.)
2052 */
2055
2057
2058 if (pq_is_send_pending())
2060
2061 Assert(wait_event != 0);
2062
2063 /* Report IO statistics, if needed */
2066 {
2067 pgstat_flush_io(false);
2069 last_flush = now;
2070 }
2071
2073 }
2074
2075 /* reactivate latch so WalSndLoop knows to continue */
2077 return RecentFlushPtr;
2078}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1880
XLogRecPtr GetXLogInsertEndRecPtr(void)
Definition xlog.c:10112
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801

References Assert, CHECK_FOR_INTERRUPTS, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogInsertEndRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 4139 of file walsender.c.

4140{
4141 for (;;)
4142 {
4143 int i;
4144 bool all_stopped = true;
4145
4146 for (i = 0; i < max_wal_senders; i++)
4147 {
4149
4150 SpinLockAcquire(&walsnd->mutex);
4151
4152 if (walsnd->pid == 0)
4153 {
4154 SpinLockRelease(&walsnd->mutex);
4155 continue;
4156 }
4157
4158 if (walsnd->state != WALSNDSTATE_STOPPING)
4159 {
4160 all_stopped = false;
4161 SpinLockRelease(&walsnd->mutex);
4162 break;
4163 }
4164 SpinLockRelease(&walsnd->mutex);
4165 }
4166
4167 /* safe to leave if confirmation is done for all WAL senders */
4168 if (all_stopped)
4169 return;
4170
4171 pg_usleep(10000L); /* wait for 10 msec */
4172 }
4173}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 4034 of file walsender.c.

4035{
4036 /*
4037 * Wake up all the walsenders waiting on WAL being flushed or replayed
4038 * respectively. Note that waiting walsender would have prepared to sleep
4039 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
4040 * before actually waiting.
4041 */
4042 if (physical)
4044
4045 if (logical)
4047}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1634 of file walsender.c.

1636{
1638
1639 /*
1640 * Fill the send timestamp last, so that it is taken as late as possible.
1641 * This is somewhat ugly, but the protocol is set as it's already used for
1642 * several releases by streaming physical replication.
1643 */
1647 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1648 tmpbuf.data, sizeof(int64));
1649
1650 /* output previously gathered data in a CopyData packet */
1652
1654
1655 /* Try to flush pending output to the client */
1656 if (pq_flush_if_writable() != 0)
1658
1659 /* Try taking fast path unless we get too close to walsender timeout. */
1661 wal_sender_timeout / 2) &&
1663 {
1664 return;
1665 }
1666
1667 /* If we have pending write here, go to slow path */
1669}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, memcpy(), now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3654 of file walsender.c.

3655{
3656 XLogRecord *record;
3657 char *errm;
3658
3659 /*
3660 * We'll use the current flush point to determine whether we've caught up.
3661 * This variable is static in order to cache it across calls. Caching is
3662 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3663 * spinlock.
3664 */
3666
3667 /*
3668 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3669 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3670 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3671 * didn't wait - i.e. when we're shutting down.
3672 */
3673 WalSndCaughtUp = false;
3674
3676
3677 /* xlog record was invalid */
3678 if (errm != NULL)
3679 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3680 errm);
3681
3682 if (record != NULL)
3683 {
3684 /*
3685 * Note the lack of any call to LagTrackerWrite() which is handled by
3686 * WalSndUpdateProgress which is called by output plugin through
3687 * logical decoding write api.
3688 */
3690
3692 }
3693
3694 /*
3695 * If first time through in this session, initialize flushPtr. Otherwise,
3696 * we only need to update flushPtr if EndRecPtr is past it.
3697 */
3700 {
3701 /*
3702 * For cascading logical WAL senders, we use the replay LSN instead of
3703 * the flush LSN, since logical decoding on a standby only processes
3704 * WAL that has been replayed. This distinction becomes particularly
3705 * important during shutdown, as new WAL is no longer replayed and the
3706 * last replayed LSN marks the furthest point up to which decoding can
3707 * proceed.
3708 */
3711 else
3713 }
3714
3715 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3717 WalSndCaughtUp = true;
3718
3719 /*
3720 * If we're caught up and have been requested to stop, have WalSndLoop()
3721 * terminate the connection in an orderly manner, after writing out all
3722 * the pending data.
3723 */
3725 got_SIGUSR2 = true;
3726
3727 /* Update shared memory status */
3728 {
3730
3731 SpinLockAcquire(&walsnd->mutex);
3732 walsnd->sentPtr = sentPtr;
3733 SpinLockRelease(&walsnd->mutex);
3734 }
3735}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:89
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire(), SpinLockRelease(), WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3344 of file walsender.c.

3345{
3347 XLogRecPtr startptr;
3348 XLogRecPtr endptr;
3349 Size nbytes;
3350 XLogSegNo segno;
3352 Size rbytes;
3353
3354 /* If requested switch the WAL sender to the stopping state. */
3355 if (got_STOPPING)
3357
3359 {
3360 WalSndCaughtUp = true;
3361 return;
3362 }
3363
3364 /* Figure out how far we can safely send the WAL. */
3366 {
3367 /*
3368 * Streaming an old timeline that's in this server's history, but is
3369 * not the one we're currently inserting or replaying. It can be
3370 * streamed up to the point where we switched off that timeline.
3371 */
3373 }
3374 else if (am_cascading_walsender)
3375 {
3377
3378 /*
3379 * Streaming the latest timeline on a standby.
3380 *
3381 * Attempt to send all WAL that has already been replayed, so that we
3382 * know it's valid. If we're receiving WAL through streaming
3383 * replication, it's also OK to send any WAL that has been received
3384 * but not replayed.
3385 *
3386 * The timeline we're recovering from can change, or we can be
3387 * promoted. In either case, the current timeline becomes historic. We
3388 * need to detect that so that we don't try to stream past the point
3389 * where we switched to another timeline. We check for promotion or
3390 * timeline switch after calculating FlushPtr, to avoid a race
3391 * condition: if the timeline becomes historic just after we checked
3392 * that it was still current, it's still be OK to stream it up to the
3393 * FlushPtr that was calculated before it became historic.
3394 */
3395 bool becameHistoric = false;
3396
3398
3399 if (!RecoveryInProgress())
3400 {
3401 /* We have been promoted. */
3403 am_cascading_walsender = false;
3404 becameHistoric = true;
3405 }
3406 else
3407 {
3408 /*
3409 * Still a cascading standby. But is the timeline we're sending
3410 * still the one recovery is recovering from?
3411 */
3413 becameHistoric = true;
3414 }
3415
3416 if (becameHistoric)
3417 {
3418 /*
3419 * The timeline we were sending has become historic. Read the
3420 * timeline history file of the new timeline to see where exactly
3421 * we forked off from the timeline we were sending.
3422 */
3423 List *history;
3424
3427
3430
3432
3434 }
3435 }
3436 else
3437 {
3438 /*
3439 * Streaming the current timeline on a primary.
3440 *
3441 * Attempt to send all data that's already been written out and
3442 * fsync'd to disk. We cannot go further than what's been written out
3443 * given the current implementation of WALRead(). And in any case
3444 * it's unsafe to send WAL that is not securely down to disk on the
3445 * primary: if the primary subsequently crashes and restarts, standbys
3446 * must not have applied any WAL that got lost on the primary.
3447 */
3449 }
3450
3451 /*
3452 * Record the current system time as an approximation of the time at which
3453 * this WAL location was written for the purposes of lag tracking.
3454 *
3455 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3456 * is flushed and we could get that time as well as the LSN when we call
3457 * GetFlushRecPtr() above (and likewise for the cascading standby
3458 * equivalent), but rather than putting any new code into the hot WAL path
3459 * it seems good enough to capture the time here. We should reach this
3460 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3461 * may take some time, we read the WAL flush pointer and take the time
3462 * very close to together here so that we'll get a later position if it is
3463 * still moving.
3464 *
3465 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3466 * this gives us a cheap approximation for the WAL flush time for this
3467 * LSN.
3468 *
3469 * Note that the LSN is not necessarily the LSN for the data contained in
3470 * the present message; it's the end of the WAL, which might be further
3471 * ahead. All the lag tracking machinery cares about is finding out when
3472 * that arbitrary LSN is eventually reported as written, flushed and
3473 * applied, so that it can measure the elapsed time.
3474 */
3476
3477 /*
3478 * If this is a historic timeline and we've reached the point where we
3479 * forked to the next timeline, stop streaming.
3480 *
3481 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3482 * startup process will normally replay all WAL that has been received
3483 * from the primary, before promoting, but if the WAL streaming is
3484 * terminated at a WAL page boundary, the valid portion of the timeline
3485 * might end in the middle of a WAL record. We might've already sent the
3486 * first half of that partial WAL record to the cascading standby, so that
3487 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3488 * replay the partial WAL record either, so it can still follow our
3489 * timeline switch.
3490 */
3492 {
3493 /* close the current file. */
3494 if (xlogreader->seg.ws_file >= 0)
3496
3497 /* Send CopyDone */
3499 streamingDoneSending = true;
3500
3501 WalSndCaughtUp = true;
3502
3503 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3506 return;
3507 }
3508
3509 /* Do we have any work to do? */
3511 if (SendRqstPtr <= sentPtr)
3512 {
3513 WalSndCaughtUp = true;
3514 return;
3515 }
3516
3517 /*
3518 * Figure out how much to send in one message. If there's no more than
3519 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3520 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3521 *
3522 * The rounding is not only for performance reasons. Walreceiver relies on
3523 * the fact that we never split a WAL record across two messages. Since a
3524 * long WAL record is split at page boundary into continuation records,
3525 * page boundary is always a safe cut-off point. We also assume that
3526 * SendRqstPtr never points to the middle of a WAL record.
3527 */
3528 startptr = sentPtr;
3529 endptr = startptr;
3530 endptr += MAX_SEND_SIZE;
3531
3532 /* if we went beyond SendRqstPtr, back off */
3533 if (SendRqstPtr <= endptr)
3534 {
3535 endptr = SendRqstPtr;
3537 WalSndCaughtUp = false;
3538 else
3539 WalSndCaughtUp = true;
3540 }
3541 else
3542 {
3543 /* round down to page boundary. */
3544 endptr -= (endptr % XLOG_BLCKSZ);
3545 WalSndCaughtUp = false;
3546 }
3547
3548 nbytes = endptr - startptr;
3549 Assert(nbytes <= MAX_SEND_SIZE);
3550
3551 /*
3552 * OK to read and send the slice.
3553 */
3556
3557 pq_sendint64(&output_message, startptr); /* dataStart */
3558 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3559 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3560
3561 /*
3562 * Read the log directly into the output buffer to avoid extra memcpy
3563 * calls.
3564 */
3566
3567retry:
3568 /* attempt to read WAL from WAL buffers first */
3570 startptr, nbytes, xlogreader->seg.ws_tli);
3572 startptr += rbytes;
3573 nbytes -= rbytes;
3574
3575 /* now read the remaining WAL from WAL file */
3576 if (nbytes > 0 &&
3579 startptr,
3580 nbytes,
3581 xlogreader->seg.ws_tli, /* Pass the current TLI because
3582 * only WalSndSegmentOpen controls
3583 * whether new TLI is needed. */
3584 &errinfo))
3586
3587 /* See logical_read_xlog_page(). */
3588 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3590
3591 /*
3592 * During recovery, the currently-open WAL file might be replaced with the
3593 * file of the same name retrieved from archive. So we always need to
3594 * check what we read was valid after reading into the buffer. If it's
3595 * invalid, we try to open and read the file again.
3596 */
3598 {
3600 bool reload;
3601
3602 SpinLockAcquire(&walsnd->mutex);
3603 reload = walsnd->needreload;
3604 walsnd->needreload = false;
3605 SpinLockRelease(&walsnd->mutex);
3606
3607 if (reload && xlogreader->seg.ws_file >= 0)
3608 {
3610
3611 goto retry;
3612 }
3613 }
3614
3615 output_message.len += nbytes;
3617
3618 /*
3619 * Fill the send timestamp last, so that it is taken as late as possible.
3620 */
3623 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3624 tmpbuf.data, sizeof(int64));
3625
3627
3628 sentPtr = endptr;
3629
3630 /* Update shared memory status */
3631 {
3633
3634 SpinLockAcquire(&walsnd->mutex);
3635 walsnd->sentPtr = sentPtr;
3636 SpinLockRelease(&walsnd->mutex);
3637 }
3638
3639 /* Report progress of XLOG streaming in PS display */
3641 {
3642 char activitymsg[50];
3643
3644 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3647 }
3648}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:118
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1789

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, memcpy(), MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 138 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 279 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 243 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ shutdown_request_timestamp

TimestampTz shutdown_request_timestamp = 0
static

Definition at line 210 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ shutdown_stream_done_queued

bool shutdown_stream_done_queued = false
static

Definition at line 217 of file walsender.c.

Referenced by WalSndDone(), and WalSndDoneImmediate().

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 226 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 172 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 173 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 155 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_shutdown_timeout

int wal_sender_shutdown_timeout = -1

Definition at line 146 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ WalSndShmemCallbacks

const ShmemCallbacks WalSndShmemCallbacks
Initial value:
= {
.request_fn = WalSndShmemRequest,
.init_fn = WalSndShmemInit,
}
static void WalSndShmemRequest(void *arg)
Definition walsender.c:3987
static void WalSndShmemInit(void *arg)
Definition walsender.c:4001

Definition at line 126 of file walsender.c.

126 {
127 .request_fn = WalSndShmemRequest,
128 .init_fn = WalSndShmemInit,
129};

◆ xlogreader