PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/subsystems.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndShmemRequest (void *arg)
 
static void WalSndShmemInit (void *arg)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static pg_noreturn void WalSndDoneImmediate (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndCheckShutdownTimeout (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void WalSndHandleConfigReload (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
const ShmemCallbacks WalSndShmemCallbacks
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
int wal_sender_shutdown_timeout = -1
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static TimestampTz shutdown_request_timestamp = 0
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 246 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 118 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 107 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 278 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1443 of file walsender.c.

1444{
1445 bool failover_given = false;
1446 bool two_phase_given = false;
1447 bool failover;
1448 bool two_phase;
1449
1450 /* Parse options */
1452 {
1453 if (strcmp(defel->defname, "failover") == 0)
1454 {
1455 if (failover_given)
1456 ereport(ERROR,
1458 errmsg("conflicting or redundant options")));
1459 failover_given = true;
1461 }
1462 else if (strcmp(defel->defname, "two_phase") == 0)
1463 {
1464 if (two_phase_given)
1465 ereport(ERROR,
1467 errmsg("conflicting or redundant options")));
1468 two_phase_given = true;
1470 }
1471 else
1472 elog(ERROR, "unrecognized option: %s", defel->defname);
1473 }
1474
1478}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
static char * errmsg
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:960

References defGetBoolean(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1220 of file walsender.c.

1221{
1222 const char *snapshot_name = NULL;
1223 char xloc[MAXFNAMELEN];
1224 char *slot_name;
1225 bool reserve_wal = false;
1226 bool two_phase = false;
1227 bool failover = false;
1231 TupleDesc tupdesc;
1232 Datum values[4];
1233 bool nulls[4] = {0};
1234
1236
1238 &failover);
1239
1240 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1241 {
1242 ReplicationSlotCreate(cmd->slotname, false,
1244 false, false, false, false);
1245
1246 if (reserve_wal)
1247 {
1249
1251
1252 /* Write this slot to disk if it's a permanent one. */
1253 if (!cmd->temporary)
1255 }
1256 }
1257 else
1258 {
1260 bool need_full_snapshot = false;
1261
1263
1265
1266 /*
1267 * Initially create persistent slot as ephemeral - that allows us to
1268 * nicely handle errors during initialization because it'll get
1269 * dropped if this transaction fails. We'll make it persistent at the
1270 * end. Temporary slots can be created as temporary from beginning as
1271 * they get dropped on error as well.
1272 */
1276
1277 /*
1278 * Do options check early so that we can bail before calling the
1279 * DecodingContextFindStartpoint which can take long time.
1280 */
1282 {
1283 if (IsTransactionBlock())
1284 ereport(ERROR,
1285 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1286 (errmsg("%s must not be called inside a transaction",
1287 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1288
1289 need_full_snapshot = true;
1290 }
1292 {
1293 if (!IsTransactionBlock())
1294 ereport(ERROR,
1295 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1296 (errmsg("%s must be called inside a transaction",
1297 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1298
1300 ereport(ERROR,
1301 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1302 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1303 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1304 if (!XactReadOnly)
1305 ereport(ERROR,
1306 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1307 (errmsg("%s must be called in a read-only transaction",
1308 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1309
1310 if (FirstSnapshotSet)
1311 ereport(ERROR,
1312 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1313 (errmsg("%s must be called before any query",
1314 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1315
1316 if (IsSubTransaction())
1317 ereport(ERROR,
1318 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1319 (errmsg("%s must not be called in a subtransaction",
1320 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1321
1322 need_full_snapshot = true;
1323 }
1324
1325 /*
1326 * Ensure the logical decoding is enabled before initializing the
1327 * logical decoding context.
1328 */
1331
1333 false,
1336 .segment_open = WalSndSegmentOpen,
1337 .segment_close = wal_segment_close),
1340
1341 /*
1342 * Signal that we don't need the timeout mechanism. We're just
1343 * creating the replication slot and don't yet accept feedback
1344 * messages or send keepalives. As we possibly need to wait for
1345 * further WAL the walsender would otherwise possibly be killed too
1346 * soon.
1347 */
1349
1350 /* build initial snapshot, might take a while */
1352
1353 /*
1354 * Export or use the snapshot if we've been asked to do so.
1355 *
1356 * NB. We will convert the snapbuild.c kind of snapshot to normal
1357 * snapshot when doing this.
1358 */
1360 {
1362 }
1364 {
1365 Snapshot snap;
1366
1369 }
1370
1371 /* don't need the decoding context anymore */
1373
1374 if (!cmd->temporary)
1376 }
1377
1378 snprintf(xloc, sizeof(xloc), "%X/%08X",
1380
1382
1383 /*----------
1384 * Need a tuple descriptor representing four columns:
1385 * - first field: the slot name
1386 * - second field: LSN at which we became consistent
1387 * - third field: exported snapshot's name
1388 * - fourth field: output plugin
1389 */
1390 tupdesc = CreateTemplateTupleDesc(4);
1391 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1392 TEXTOID, -1, 0);
1393 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1394 TEXTOID, -1, 0);
1395 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1396 TEXTOID, -1, 0);
1397 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1398 TEXTOID, -1, 0);
1399 TupleDescFinalize(tupdesc);
1400
1401 /* prepare for projection of tuples */
1403
1404 /* slot_name */
1405 slot_name = NameStr(MyReplicationSlot->data.name);
1406 values[0] = CStringGetTextDatum(slot_name);
1407
1408 /* consistent wal location */
1410
1411 /* snapshot name, or NULL if none */
1412 if (snapshot_name != NULL)
1414 else
1415 nulls[2] = true;
1416
1417 /* plugin, or NULL if none */
1418 if (cmd->plugin != NULL)
1420 else
1421 nulls[3] = true;
1422
1423 /* send it to dest */
1424 do_tup_output(tstate, values, nulls);
1426
1428}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define NameStr(name)
Definition c.h:835
#define Assert(condition)
Definition c.h:943
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:673
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:629
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:325
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition slot.c:1184
void ReplicationSlotReserveWal(void)
Definition slot.c:1711
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
void ReplicationSlotPersist(void)
Definition slot.c:1201
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
void ReplicationSlotSave(void)
Definition slot.c:1166
void ReplicationSlotRelease(void)
Definition slot.c:769
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:556
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:71
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:213
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescFinalize(TupleDesc tupdesc)
Definition tupdesc.c:511
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:976
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1143
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3237
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1605
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1729
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1070
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1578
static TimestampTz last_reply_timestamp
Definition walsender.c:204
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
bool IsSubTransaction(void)
Definition xact.c:5095
bool IsTransactionBlock(void)
Definition xact.c:5022
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg, ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1434 of file walsender.c.

1435{
1436 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1437}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:920

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 2058 of file walsender.c.

2059{
2060 yyscan_t scanner;
2061 int parse_rc;
2062 Node *cmd_node;
2063 const char *cmdtag;
2065
2066 /* We save and re-use the cmd_context across calls */
2068
2069 /*
2070 * If WAL sender has been told that shutdown is getting close, switch its
2071 * status accordingly to handle the next replication commands correctly.
2072 */
2073 if (got_STOPPING)
2075
2076 /*
2077 * Throw error if in stopping mode. We need prevent commands that could
2078 * generate WAL while the shutdown checkpoint is being written. To be
2079 * safe, we just prohibit all new commands.
2080 */
2082 ereport(ERROR,
2084 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2085
2086 /*
2087 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2088 * command arrives. Clean up the old stuff if there's anything.
2089 */
2091
2093
2094 /*
2095 * Prepare to parse and execute the command.
2096 *
2097 * Because replication command execution can involve beginning or ending
2098 * transactions, we need a working context that will survive that, so we
2099 * make it a child of TopMemoryContext. That in turn creates a hazard of
2100 * long-lived memory leaks if we lose track of the working context. We
2101 * deal with that by creating it only once per walsender, and resetting it
2102 * for each new command. (Normally this reset is a no-op, but if the
2103 * prior exec_replication_command call failed with an error, it won't be.)
2104 *
2105 * This is subtler than it looks. The transactions we manage can extend
2106 * across replication commands, indeed SnapBuildClearExportedSnapshot
2107 * might have just ended one. Because transaction exit will revert to the
2108 * memory context that was current at transaction start, we need to be
2109 * sure that that context is still valid. That motivates re-using the
2110 * same cmd_context rather than making a new one each time.
2111 */
2112 if (cmd_context == NULL)
2114 "Replication command context",
2116 else
2118
2120
2122
2123 /*
2124 * Is it a WalSender command?
2125 */
2127 {
2128 /* Nope; clean up and get out. */
2130
2133
2134 /* XXX this is a pretty random place to make this check */
2135 if (MyDatabaseId == InvalidOid)
2136 ereport(ERROR,
2138 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2139
2140 /* Tell the caller that this wasn't a WalSender command. */
2141 return false;
2142 }
2143
2144 /*
2145 * Looks like a WalSender command, so parse it.
2146 */
2148 if (parse_rc != 0)
2149 ereport(ERROR,
2151 errmsg_internal("replication command parser returned %d",
2152 parse_rc)));
2154
2155 /*
2156 * Report query to various monitoring facilities. For this purpose, we
2157 * report replication commands just like SQL commands.
2158 */
2160
2162
2163 /*
2164 * Log replication command if log_replication_commands is enabled. Even
2165 * when it's disabled, log the command with DEBUG1 level for backward
2166 * compatibility.
2167 */
2169 (errmsg("received replication command: %s", cmd_string)));
2170
2171 /*
2172 * Disallow replication commands in aborted transaction blocks.
2173 */
2175 ereport(ERROR,
2177 errmsg("current transaction is aborted, "
2178 "commands ignored until end of transaction block")));
2179
2181
2182 /*
2183 * Allocate buffers that will be used for each outgoing and incoming
2184 * message. We do this just once per command to reduce palloc overhead.
2185 */
2189
2190 switch (cmd_node->type)
2191 {
2193 cmdtag = "IDENTIFY_SYSTEM";
2197 break;
2198
2200 cmdtag = "READ_REPLICATION_SLOT";
2204 break;
2205
2206 case T_BaseBackupCmd:
2207 cmdtag = "BASE_BACKUP";
2212 break;
2213
2215 cmdtag = "CREATE_REPLICATION_SLOT";
2219 break;
2220
2222 cmdtag = "DROP_REPLICATION_SLOT";
2226 break;
2227
2229 cmdtag = "ALTER_REPLICATION_SLOT";
2233 break;
2234
2236 {
2238
2239 cmdtag = "START_REPLICATION";
2242
2243 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2244 StartReplication(cmd);
2245 else
2247
2248 /* dupe, but necessary per libpqrcv_endstreaming */
2250
2252 break;
2253 }
2254
2256 cmdtag = "TIMELINE_HISTORY";
2261 break;
2262
2263 case T_VariableShowStmt:
2264 {
2267
2268 cmdtag = "SHOW";
2270
2271 /* syscache access needs a transaction environment */
2273 GetPGVariable(n->name, dest);
2276 }
2277 break;
2278
2280 cmdtag = "UPLOAD_MANIFEST";
2285 break;
2286
2287 default:
2288 elog(ERROR, "unrecognized replication command node tag: %u",
2289 cmd_node->type);
2290 }
2291
2292 /*
2293 * Done. Revert to caller's memory context, and clean out the cmd_context
2294 * to recover memory right away.
2295 */
2298
2299 /*
2300 * We need not update ps display or pg_stat_activity, because PostgresMain
2301 * will reset those to "idle". But we must reset debug_query_string to
2302 * ensure it doesn't become a dangling pointer.
2303 */
2305
2306 return true;
2307}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:992
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:206
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
Oid MyDatabaseId
Definition globals.c:96
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:410
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const char * debug_query_string
Definition postgres.c:94
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:617
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1443
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:604
WalSnd * MyWalSnd
Definition walsender.c:132
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:504
static StringInfoData tmpbuf
Definition walsender.c:195
static void IdentifySystem(void)
Definition walsender.c:422
static StringInfoData reply_message
Definition walsender.c:194
void WalSndSetState(WalSndState state)
Definition walsender.c:4101
static StringInfoData output_message
Definition walsender.c:193
static void UploadManifest(void)
Definition walsender.c:695
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:226
bool log_replication_commands
Definition walsender.c:150
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1220
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1485
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:172
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1434
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:837
static XLogReaderState * xlogreader
Definition walsender.c:162
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
bool IsAbortedTransactionBlockState(void)
Definition xact.c:409
void CommitTransactionCommand(void)
Definition xact.c:3207

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3804 of file walsender.c.

3805{
3807 TimeLineID replayTLI;
3811
3813
3814 /*
3815 * We can safely send what's already been replayed. Also, if walreceiver
3816 * is streaming WAL from the same timeline, we can send anything that it
3817 * has streamed, but hasn't been replayed yet.
3818 */
3819
3821 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3822
3823 if (tli)
3824 *tli = replayTLI;
3825
3826 result = replayPtr;
3827 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3829
3830 return result;
3831}
uint32 result
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1897
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:136
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), receiveTLI, and result.

Referenced by IdentifySystem(), StartReplication(), update_local_synced_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 761 of file walsender.c.

763{
764 int mtype;
765 int maxmsglen;
766
768
770 mtype = pq_getbyte();
771 if (mtype == EOF)
774 errmsg("unexpected EOF on client connection with an open transaction")));
775
776 switch (mtype)
777 {
778 case PqMsg_CopyData:
780 break;
781 case PqMsg_CopyDone:
782 case PqMsg_CopyFail:
783 case PqMsg_Flush:
784 case PqMsg_Sync:
786 break;
787 default:
790 errmsg("unexpected message type 0x%02X during COPY from stdin",
791 mtype)));
792 maxmsglen = 0; /* keep compiler quiet */
793 break;
794 }
795
796 /* Now collect the message body */
800 errmsg("unexpected EOF on client connection with an open transaction")));
802
803 /* Process the message */
804 switch (mtype)
805 {
806 case PqMsg_CopyData:
808 return true;
809
810 case PqMsg_CopyDone:
811 return false;
812
813 case PqMsg_Sync:
814 case PqMsg_Flush:
815 /* Ignore these while in CopyOut mode as we do elsewhere. */
816 return true;
817
818 case PqMsg_CopyFail:
821 errmsg("COPY from stdin failed: %s",
823 }
824
825 /* Not reached. */
826 Assert(false);
827 return false;
828}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:33
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:34
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:146
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1204
int pq_getbyte(void)
Definition pqcomm.c:964
void pq_startmsgread(void)
Definition pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3860 of file walsender.c.

3861{
3863
3864 /*
3865 * If replication has not yet started, die like with SIGTERM. If
3866 * replication is active, only set a flag and wake up the main loop. It
3867 * will send any outstanding WAL, wait for it to be replicated to the
3868 * standby, and then exit gracefully.
3869 */
3870 if (!replication_active)
3872 else
3873 got_STOPPING = true;
3874
3875 /* latch will be set by procsignal_sigusr1_handler */
3876}
int MyProcPid
Definition globals.c:49
bool am_walsender
Definition walsender.c:135
static volatile sig_atomic_t replication_active
Definition walsender.c:234
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 422 of file walsender.c.

423{
424 char sysid[32];
425 char xloc[MAXFNAMELEN];
427 char *dbname = NULL;
430 TupleDesc tupdesc;
431 Datum values[4];
432 bool nulls[4] = {0};
433 TimeLineID currTLI;
434
435 /*
436 * Reply with a result set with one row, four columns. First col is system
437 * ID, second is timeline ID, third is current xlog location and the
438 * fourth contains the database name if we are connected to one.
439 */
440
443
446 logptr = GetStandbyFlushRecPtr(&currTLI);
447 else
448 logptr = GetFlushRecPtr(&currTLI);
449
450 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
451
453 {
455
456 /* syscache access needs a transaction env. */
459 /* copy dbname out of TX context */
462 }
463
465
466 /* need a tuple descriptor representing four columns */
467 tupdesc = CreateTemplateTupleDesc(4);
468 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
469 TEXTOID, -1, 0);
470 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
471 INT8OID, -1, 0);
472 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
473 TEXTOID, -1, 0);
474 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
475 TEXTOID, -1, 0);
476 TupleDescFinalize(tupdesc);
477
478 /* prepare for projection of tuples */
480
481 /* column 1: system identifier */
483
484 /* column 2: timeline */
485 values[1] = Int64GetDatum(currTLI);
486
487 /* column 3: wal location */
489
490 /* column 4: database name, or NULL if none */
491 if (dbname)
493 else
494 nulls[3] = true;
495
496 /* send it to dest */
497 do_tup_output(tstate, values, nulls);
498
500}
#define UINT64_FORMAT
Definition c.h:635
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1312
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
static Datum Int64GetDatum(int64 X)
Definition postgres.h:413
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3804
uint64 GetSystemIdentifier(void)
Definition xlog.c:4647
bool RecoveryInProgress(void)
Definition xlog.c:6830
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6995

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 323 of file walsender.c.

324{
326
327 /* Create a per-walsender data structure in shared memory */
329
330 /* need resource owner for e.g. basebackups */
332
333 /*
334 * Let postmaster know that we're a WAL sender. Once we've declared us as
335 * a WAL sender process, postmaster will let us outlive the bgwriter and
336 * kill us last in the shutdown sequence, so we get a chance to stream all
337 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
338 * there's no going back, and we mustn't write any WAL records after this.
339 */
342
343 /*
344 * If the client didn't specify a database to connect to, show in PGPROC
345 * that our advertised xmin should affect vacuum horizons in all
346 * databases. This allows physical replication clients to send hot
347 * standby feedback that will delay vacuum cleanup in all databases.
348 */
350 {
356 }
357
358 /* Initialize empty timestamp buffer for lag tracking. */
360}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:308
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:44
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:66
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PROC_HDR * ProcGlobal
Definition proc.c:74
TransactionId xmin
Definition proc.h:242
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3145
static LagTracker * lag_tracker
Definition walsender.c:272

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3145 of file walsender.c.

3146{
3147 int i;
3148
3149 /*
3150 * WalSndCtl should be set up already (we inherit this by fork() or
3151 * EXEC_BACKEND mechanism from the postmaster).
3152 */
3153 Assert(WalSndCtl != NULL);
3154 Assert(MyWalSnd == NULL);
3155
3156 /*
3157 * Find a free walsender slot and reserve it. This must not fail due to
3158 * the prior check for free WAL senders in InitProcess().
3159 */
3160 for (i = 0; i < max_wal_senders; i++)
3161 {
3163
3164 SpinLockAcquire(&walsnd->mutex);
3165
3166 if (walsnd->pid != 0)
3167 {
3168 SpinLockRelease(&walsnd->mutex);
3169 continue;
3170 }
3171 else
3172 {
3173 /*
3174 * Found a free slot. Reserve it for us.
3175 */
3176 walsnd->pid = MyProcPid;
3177 walsnd->state = WALSNDSTATE_STARTUP;
3178 walsnd->sentPtr = InvalidXLogRecPtr;
3179 walsnd->needreload = false;
3180 walsnd->write = InvalidXLogRecPtr;
3181 walsnd->flush = InvalidXLogRecPtr;
3182 walsnd->apply = InvalidXLogRecPtr;
3183 walsnd->writeLag = -1;
3184 walsnd->flushLag = -1;
3185 walsnd->applyLag = -1;
3186 walsnd->sync_standby_priority = 0;
3187 walsnd->replyTime = 0;
3188
3189 /*
3190 * The kind assignment is done here and not in StartReplication()
3191 * and StartLogicalReplication(). Indeed, the logical walsender
3192 * needs to read WAL records (like snapshot of running
3193 * transactions) during the slot creation. So it needs to be woken
3194 * up based on its kind.
3195 *
3196 * The kind assignment could also be done in StartReplication(),
3197 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3198 * seems better to set it on one place.
3199 */
3200 if (MyDatabaseId == InvalidOid)
3202 else
3204
3205 SpinLockRelease(&walsnd->mutex);
3206 /* don't need the lock anymore */
3207 MyWalSnd = walsnd;
3208
3209 break;
3210 }
3211 }
3212
3213 Assert(MyWalSnd != NULL);
3214
3215 /* Arrange to clean up at walsender exit */
3217}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:141
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3221
WalSndCtlData * WalSndCtl
Definition walsender.c:121
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4445 of file walsender.c.

4446{
4447 TimestampTz time = 0;
4448
4449 /*
4450 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4451 * return the elapsed time (in microseconds) since the saved local flush
4452 * time. If the flush time is in the future (due to clock drift), return
4453 * -1 to treat as no valid sample.
4454 *
4455 * Otherwise, switch back to using the buffer to control the read head and
4456 * compute the elapsed time. The read head is then reset to point to the
4457 * oldest entry in the buffer.
4458 */
4459 if (lag_tracker->read_heads[head] == -1)
4460 {
4461 if (lag_tracker->overflowed[head].lsn > lsn)
4462 return (now >= lag_tracker->overflowed[head].time) ?
4463 now - lag_tracker->overflowed[head].time : -1;
4464
4465 time = lag_tracker->overflowed[head].time;
4467 lag_tracker->read_heads[head] =
4469 }
4470
4471 /* Read all unread samples up to this LSN or end of buffer. */
4472 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4474 {
4476 lag_tracker->last_read[head] =
4478 lag_tracker->read_heads[head] =
4480 }
4481
4482 /*
4483 * If the lag tracker is empty, that means the standby has processed
4484 * everything we've ever sent so we should now clear 'last_read'. If we
4485 * didn't do that, we'd risk using a stale and irrelevant sample for
4486 * interpolation at the beginning of the next burst of WAL after a period
4487 * of idleness.
4488 */
4490 lag_tracker->last_read[head].time = 0;
4491
4492 if (time > now)
4493 {
4494 /* If the clock somehow went backwards, treat as not found. */
4495 return -1;
4496 }
4497 else if (time == 0)
4498 {
4499 /*
4500 * We didn't cross a time. If there is a future sample that we
4501 * haven't reached yet, and we've already reached at least one sample,
4502 * let's interpolate the local flushed time. This is mainly useful
4503 * for reporting a completely stuck apply position as having
4504 * increasing lag, since otherwise we'd have to wait for it to
4505 * eventually start moving again and cross one of our samples before
4506 * we can show the lag increasing.
4507 */
4509 {
4510 /* There are no future samples, so we can't interpolate. */
4511 return -1;
4512 }
4513 else if (lag_tracker->last_read[head].time != 0)
4514 {
4515 /* We can interpolate between last_read and the next sample. */
4516 double fraction;
4517 WalTimeSample prev = lag_tracker->last_read[head];
4519
4520 if (lsn < prev.lsn)
4521 {
4522 /*
4523 * Reported LSNs shouldn't normally go backwards, but it's
4524 * possible when there is a timeline change. Treat as not
4525 * found.
4526 */
4527 return -1;
4528 }
4529
4530 Assert(prev.lsn < next.lsn);
4531
4532 if (prev.time > next.time)
4533 {
4534 /* If the clock somehow went backwards, treat as not found. */
4535 return -1;
4536 }
4537
4538 /* See how far we are between the previous and next samples. */
4539 fraction =
4540 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4541
4542 /* Scale the local flush time proportionally. */
4543 time = (TimestampTz)
4544 ((double) prev.time + (next.time - prev.time) * fraction);
4545 }
4546 else
4547 {
4548 /*
4549 * We have only a future sample, implying that we were entirely
4550 * caught up but and now there is a new burst of WAL and the
4551 * standby hasn't processed the first sample yet. Until the
4552 * standby reaches the future sample the best we can do is report
4553 * the hypothetical lag if that sample were to be replayed now.
4554 */
4556 }
4557 }
4558
4559 /* Return the elapsed time since local flush time in microseconds. */
4560 Assert(time != 0);
4561 return now - time;
4562}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:252
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:254
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:255
int write_head
Definition walsender.c:253
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:269
TimestampTz time
Definition walsender.c:242
XLogRecPtr lsn
Definition walsender.c:241
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:246

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4387 of file walsender.c.

4388{
4389 int new_write_head;
4390 int i;
4391
4392 if (!am_walsender)
4393 return;
4394
4395 /*
4396 * If the lsn hasn't advanced since last time, then do nothing. This way
4397 * we only record a new sample when new WAL has been written.
4398 */
4399 if (lag_tracker->last_lsn == lsn)
4400 return;
4401 lag_tracker->last_lsn = lsn;
4402
4403 /*
4404 * If advancing the write head of the circular buffer would crash into any
4405 * of the read heads, then the buffer is full. In other words, the
4406 * slowest reader (presumably apply) is the one that controls the release
4407 * of space.
4408 */
4410 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4411 {
4412 /*
4413 * If the buffer is full, move the slowest reader to a separate
4414 * overflow entry and free its space in the buffer so the write head
4415 * can advance.
4416 */
4418 {
4421 lag_tracker->read_heads[i] = -1;
4422 }
4423 }
4424
4425 /* Store a sample at the current write head position. */
4429}
XLogRecPtr last_lsn
Definition walsender.c:251
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1070 of file walsender.c.

1072{
1074 int count;
1076 XLogSegNo segno;
1077 TimeLineID currTLI;
1078
1079 /*
1080 * Make sure we have enough WAL available before retrieving the current
1081 * timeline.
1082 */
1084
1085 /* Fail if not enough (implies we are going to shut down) */
1087 return -1;
1088
1089 /*
1090 * Since logical decoding is also permitted on a standby server, we need
1091 * to check if the server is in recovery to decide how to get the current
1092 * timeline ID (so that it also covers the promotion or timeline change
1093 * cases). We must determine am_cascading_walsender after waiting for the
1094 * required WAL so that it is correct when the walsender wakes up after a
1095 * promotion.
1096 */
1098
1100 GetXLogReplayRecPtr(&currTLI);
1101 else
1102 currTLI = GetWALInsertionTimeLine();
1103
1105 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1106 sendTimeLine = state->currTLI;
1107 sendTimeLineValidUpto = state->currTLIValidUntil;
1108 sendTimeLineNextTLI = state->nextTLI;
1109
1111 count = XLOG_BLCKSZ; /* more than one block available */
1112 else
1113 count = flushptr - targetPagePtr; /* part of the page available */
1114
1115 /* now actually read the data, we know it's there */
1116 if (!WALRead(state,
1117 cur_page,
1119 count,
1120 currTLI, /* Pass the current TLI because only
1121 * WalSndSegmentOpen controls whether new TLI
1122 * is needed. */
1123 &errinfo))
1125
1126 /*
1127 * After reading into the buffer, check that what we read was valid. We do
1128 * this after reading, because even though the segment was present when we
1129 * opened it, it might get recycled or removed while we read it. The
1130 * read() succeeds in that case, but the data we tried to read might
1131 * already have been overwritten with new WAL records.
1132 */
1133 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1134 CheckXLogRemoved(segno, state->seg.ws_tli);
1135
1136 return count;
1137}
static TimeLineID sendTimeLine
Definition walsender.c:181
static bool sendTimeLineIsHistoric
Definition walsender.c:183
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1879
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:182
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:184
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:7016
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3782
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1819 of file walsender.c.

1820{
1821 int elevel = got_STOPPING ? ERROR : WARNING;
1822 bool failover_slot;
1823
1825
1826 /*
1827 * Note that after receiving the shutdown signal, an ERROR is reported if
1828 * any slots are dropped, invalidated, or inactive. This measure is taken
1829 * to prevent the walsender from waiting indefinitely.
1830 */
1832 {
1834 return true;
1835 }
1836
1837 *wait_event = 0;
1838 return false;
1839}
#define WARNING
Definition elog.h:37
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3113

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1851 of file walsender.c.

1853{
1854 /* Check if we need to wait for WALs to be flushed to disk */
1855 if (target_lsn > flushed_lsn)
1856 {
1858 return true;
1859 }
1860
1861 /* Check if the standby slots have caught up to the flushed position */
1863}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1819

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 4139 of file walsender.c.

4140{
4142
4143 result->month = 0;
4144 result->day = 0;
4145 result->time = offset;
4146
4147 return result;
4148}
#define palloc_object(type)
Definition fe_memutils.h:74

References palloc_object, and result.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1143 of file walsender.c.

1147{
1148 ListCell *lc;
1149 bool snapshot_action_given = false;
1150 bool reserve_wal_given = false;
1151 bool two_phase_given = false;
1152 bool failover_given = false;
1153
1154 /* Parse options */
1155 foreach(lc, cmd->options)
1156 {
1157 DefElem *defel = (DefElem *) lfirst(lc);
1158
1159 if (strcmp(defel->defname, "snapshot") == 0)
1160 {
1161 char *action;
1162
1164 ereport(ERROR,
1166 errmsg("conflicting or redundant options")));
1167
1169 snapshot_action_given = true;
1170
1171 if (strcmp(action, "export") == 0)
1173 else if (strcmp(action, "nothing") == 0)
1175 else if (strcmp(action, "use") == 0)
1177 else
1178 ereport(ERROR,
1180 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1181 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1182 }
1183 else if (strcmp(defel->defname, "reserve_wal") == 0)
1184 {
1186 ereport(ERROR,
1188 errmsg("conflicting or redundant options")));
1189
1190 reserve_wal_given = true;
1192 }
1193 else if (strcmp(defel->defname, "two_phase") == 0)
1194 {
1196 ereport(ERROR,
1198 errmsg("conflicting or redundant options")));
1199 two_phase_given = true;
1201 }
1202 else if (strcmp(defel->defname, "failover") == 0)
1203 {
1205 ereport(ERROR,
1207 errmsg("conflicting or redundant options")));
1208 failover_given = true;
1210 }
1211 else
1212 elog(ERROR, "unrecognized option: %s", defel->defname);
1213 }
1214}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 4155 of file walsender.c.

4156{
4157#define PG_STAT_GET_WAL_SENDERS_COLS 12
4158 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4160 int num_standbys;
4161 int i;
4162
4163 InitMaterializedSRF(fcinfo, 0);
4164
4165 /*
4166 * Get the currently active synchronous standbys. This could be out of
4167 * date before we're done, but we'll use the data anyway.
4168 */
4170
4171 for (i = 0; i < max_wal_senders; i++)
4172 {
4176 XLogRecPtr flush;
4177 XLogRecPtr apply;
4178 TimeOffset writeLag;
4179 TimeOffset flushLag;
4180 TimeOffset applyLag;
4181 int priority;
4182 int pid;
4184 TimestampTz replyTime;
4185 bool is_sync_standby;
4187 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4188 int j;
4189
4190 /* Collect data from shared memory */
4191 SpinLockAcquire(&walsnd->mutex);
4192 if (walsnd->pid == 0)
4193 {
4194 SpinLockRelease(&walsnd->mutex);
4195 continue;
4196 }
4197 pid = walsnd->pid;
4198 sent_ptr = walsnd->sentPtr;
4199 state = walsnd->state;
4200 write = walsnd->write;
4201 flush = walsnd->flush;
4202 apply = walsnd->apply;
4203 writeLag = walsnd->writeLag;
4204 flushLag = walsnd->flushLag;
4205 applyLag = walsnd->applyLag;
4206 priority = walsnd->sync_standby_priority;
4207 replyTime = walsnd->replyTime;
4208 SpinLockRelease(&walsnd->mutex);
4209
4210 /*
4211 * Detect whether walsender is/was considered synchronous. We can
4212 * provide some protection against stale data by checking the PID
4213 * along with walsnd_index.
4214 */
4215 is_sync_standby = false;
4216 for (j = 0; j < num_standbys; j++)
4217 {
4218 if (sync_standbys[j].walsnd_index == i &&
4219 sync_standbys[j].pid == pid)
4220 {
4221 is_sync_standby = true;
4222 break;
4223 }
4224 }
4225
4226 values[0] = Int32GetDatum(pid);
4227
4229 {
4230 /*
4231 * Only superusers and roles with privileges of pg_read_all_stats
4232 * can see details. Other users only get the pid value to know
4233 * it's a walsender, but no details.
4234 */
4235 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4236 }
4237 else
4238 {
4240
4242 nulls[2] = true;
4244
4246 nulls[3] = true;
4247 values[3] = LSNGetDatum(write);
4248
4249 if (!XLogRecPtrIsValid(flush))
4250 nulls[4] = true;
4251 values[4] = LSNGetDatum(flush);
4252
4253 if (!XLogRecPtrIsValid(apply))
4254 nulls[5] = true;
4255 values[5] = LSNGetDatum(apply);
4256
4257 /*
4258 * Treat a standby such as a pg_basebackup background process
4259 * which always returns an invalid flush location, as an
4260 * asynchronous standby.
4261 */
4262 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4263
4264 if (writeLag < 0)
4265 nulls[6] = true;
4266 else
4268
4269 if (flushLag < 0)
4270 nulls[7] = true;
4271 else
4273
4274 if (applyLag < 0)
4275 nulls[8] = true;
4276 else
4278
4280
4281 /*
4282 * More easily understood version of standby state. This is purely
4283 * informational.
4284 *
4285 * In quorum-based sync replication, the role of each standby
4286 * listed in synchronous_standby_names can be changing very
4287 * frequently. Any standbys considered as "sync" at one moment can
4288 * be switched to "potential" ones at the next moment. So, it's
4289 * basically useless to report "sync" or "potential" as their sync
4290 * states. We report just "quorum" for them.
4291 */
4292 if (priority == 0)
4293 values[10] = CStringGetTextDatum("async");
4294 else if (is_sync_standby)
4296 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4297 else
4298 values[10] = CStringGetTextDatum("potential");
4299
4300 if (replyTime == 0)
4301 nulls[11] = true;
4302 else
4303 values[11] = TimestampTzGetDatum(replyTime);
4304 }
4305
4306 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4307 values, nulls);
4308 }
4309
4310 return (Datum) 0;
4311}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define MemSet(start, val, len)
Definition c.h:1107
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:470
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:767
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:4139
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:4120
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2465 of file walsender.c.

2466{
2467 bool changed = false;
2469
2471 SpinLockAcquire(&slot->mutex);
2472 if (slot->data.restart_lsn != lsn)
2473 {
2474 changed = true;
2475 slot->data.restart_lsn = lsn;
2476 }
2477 SpinLockRelease(&slot->mutex);
2478
2479 if (changed)
2480 {
2484 }
2485
2486 /*
2487 * One could argue that the slot should be saved to disk now, but that'd
2488 * be energy wasted - the worst thing lost information could cause here is
2489 * to give wrong information in a statistics view - we'll just potentially
2490 * be more conservative in removing files.
2491 */
2492}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1308
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1794

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2606 of file walsender.c.

2607{
2608 bool changed = false;
2610
2611 SpinLockAcquire(&slot->mutex);
2613
2614 /*
2615 * For physical replication we don't need the interlock provided by xmin
2616 * and effective_xmin since the consequences of a missed increase are
2617 * limited to query cancellations, so set both at once.
2618 */
2619 if (!TransactionIdIsNormal(slot->data.xmin) ||
2622 {
2623 changed = true;
2624 slot->data.xmin = feedbackXmin;
2626 }
2630 {
2631 changed = true;
2634 }
2635 SpinLockRelease(&slot->mutex);
2636
2637 if (changed)
2638 {
2641 }
2642}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1226
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:210
TransactionId effective_xmin
Definition slot.h:209
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1794 of file walsender.c.

1795{
1797
1798 /*
1799 * If we are running in a standby, there is no need to wake up walsenders.
1800 * This is because we do not support syncing slots to cascading standbys,
1801 * so, there are no walsenders waiting for standbys to catch up.
1802 */
1803 if (RecoveryInProgress())
1804 return;
1805
1808}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3080
#define SlotIsPhysical(slot)
Definition slot.h:287
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1673 of file walsender.c.

1674{
1675 for (;;)
1676 {
1677 long sleeptime;
1678
1679 /* Check for input from the client */
1681
1682 /* die if timeout was reached */
1684
1685 /*
1686 * During shutdown, die if the shutdown timeout expires. Call this
1687 * before WalSndComputeSleeptime() so the timeout is considered when
1688 * computing sleep time.
1689 */
1691
1692 /* Send keepalive if the time has come */
1694
1695 if (!pq_is_send_pending())
1696 break;
1697
1699
1700 /* Sleep until something happens or we time out */
1703
1704 /* Clear any already-pending wakeups */
1706
1708
1709 /* Process any requests or signals received recently */
1711
1712 /* Try to flush pending output to the client */
1713 if (pq_flush_if_writable() != 0)
1715 }
1716
1717 /* reactivate latch so WalSndLoop knows to continue */
1719}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:50
#define pq_is_send_pending()
Definition libpq.h:51
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:3979
static void WalSndCheckTimeOut(void)
Definition walsender.c:2935
static void ProcessRepliesIfAny(void)
Definition walsender.c:2314
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4349
static void WalSndCheckShutdownTimeout(void)
Definition walsender.c:2965
static void WalSndHandleConfigReload(void)
Definition walsender.c:1650
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:406
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2878

References CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), SetLatch(), WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2314 of file walsender.c.

2315{
2316 unsigned char firstchar;
2317 int maxmsglen;
2318 int r;
2319 bool received = false;
2320
2322
2323 /*
2324 * If we already received a CopyDone from the frontend, any subsequent
2325 * message is the beginning of a new command, and should be processed in
2326 * the main processing loop.
2327 */
2328 while (!streamingDoneReceiving)
2329 {
2332 if (r < 0)
2333 {
2334 /* unexpected error or EOF */
2337 errmsg("unexpected EOF on standby connection")));
2338 proc_exit(0);
2339 }
2340 if (r == 0)
2341 {
2342 /* no data available without blocking */
2343 pq_endmsgread();
2344 break;
2345 }
2346
2347 /* Validate message type and set packet size limit */
2348 switch (firstchar)
2349 {
2350 case PqMsg_CopyData:
2352 break;
2353 case PqMsg_CopyDone:
2354 case PqMsg_Terminate:
2356 break;
2357 default:
2358 ereport(FATAL,
2360 errmsg("invalid standby message type \"%c\"",
2361 firstchar)));
2362 maxmsglen = 0; /* keep compiler quiet */
2363 break;
2364 }
2365
2366 /* Read the message contents */
2369 {
2372 errmsg("unexpected EOF on standby connection")));
2373 proc_exit(0);
2374 }
2375
2376 /* ... and process it */
2377 switch (firstchar)
2378 {
2379 /*
2380 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2381 * packet.
2382 */
2383 case PqMsg_CopyData:
2385 received = true;
2386 break;
2387
2388 /*
2389 * PqMsg_CopyDone means the standby requested to finish
2390 * streaming. Reply with CopyDone, if we had not sent that
2391 * already.
2392 */
2393 case PqMsg_CopyDone:
2395 {
2397 streamingDoneSending = true;
2398 }
2399
2401 received = true;
2402 break;
2403
2404 /*
2405 * PqMsg_Terminate means that the standby is closing down the
2406 * socket.
2407 */
2408 case PqMsg_Terminate:
2409 proc_exit(0);
2410
2411 default:
2412 Assert(false); /* NOT REACHED */
2413 }
2414 }
2415
2416 /*
2417 * Save the last reply timestamp if we've received at least one reply.
2418 */
2419 if (received)
2420 {
2423 }
2424}
#define COMMERROR
Definition elog.h:34
#define FATAL
Definition elog.h:42
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:54
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1004
void pq_endmsgread(void)
Definition pqcomm.c:1166
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:207
static TimestampTz last_processing
Definition walsender.c:198
static bool streamingDoneSending
Definition walsender.c:218
static void ProcessStandbyMessage(void)
Definition walsender.c:2430
static bool streamingDoneReceiving
Definition walsender.c:219

References Assert, COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2686 of file walsender.c.

2687{
2692 TimestampTz replyTime;
2693
2694 /*
2695 * Decipher the reply message. The caller already consumed the msgtype
2696 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2697 * of this message.
2698 */
2699 replyTime = pq_getmsgint64(&reply_message);
2704
2706 {
2707 char *replyTimeStr;
2708
2709 /* Copy because timestamptz_to_str returns a static buffer */
2711
2712 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2717 replyTimeStr);
2718
2720 }
2721
2722 /*
2723 * Update shared state for this WalSender process based on reply data from
2724 * standby.
2725 */
2726 {
2728
2729 SpinLockAcquire(&walsnd->mutex);
2730 walsnd->replyTime = replyTime;
2731 SpinLockRelease(&walsnd->mutex);
2732 }
2733
2734 /*
2735 * Unset WalSender's xmins if the feedback message values are invalid.
2736 * This happens when the downstream turned hot_standby_feedback off.
2737 */
2740 {
2742 if (MyReplicationSlot != NULL)
2744 return;
2745 }
2746
2747 /*
2748 * Check that the provided xmin/epoch are sane, that is, not in the future
2749 * and not so far back as to be already wrapped around. Ignore if not.
2750 */
2753 return;
2754
2757 return;
2758
2759 /*
2760 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2761 * the xmin will be taken into account by GetSnapshotData() /
2762 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2763 * thereby prevent the generation of cleanup conflicts on the standby
2764 * server.
2765 *
2766 * There is a small window for a race condition here: although we just
2767 * checked that feedbackXmin precedes nextXid, the nextXid could have
2768 * gotten advanced between our fetching it and applying the xmin below,
2769 * perhaps far enough to make feedbackXmin wrap around. In that case the
2770 * xmin we set here would be "in the future" and have no effect. No point
2771 * in worrying about this since it's too late to save the desired data
2772 * anyway. Assuming that the standby sends us an increasing sequence of
2773 * xmins, this could only happen during the first reply cycle, else our
2774 * own xmin would prevent nextXid from advancing so far.
2775 *
2776 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2777 * is assumed atomic, and there's no real need to prevent concurrent
2778 * horizon determinations. (If we're moving our xmin forward, this is
2779 * obviously safe, and if we're moving it backwards, well, the data is at
2780 * risk already since a VACUUM could already have determined the horizon.)
2781 *
2782 * If we're using a replication slot we reserve the xmin via that,
2783 * otherwise via the walsender's PGPROC entry. We can only track the
2784 * catalog xmin separately when using a slot, so we store the least of the
2785 * two provided when not using a slot.
2786 *
2787 * XXX: It might make sense to generalize the ephemeral slot concept and
2788 * always use the slot mechanism to handle the feedback xmin.
2789 */
2790 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2792 else
2793 {
2797 else
2799 }
2800}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1856
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
bool message_level_is_interesting(int elevel)
Definition elog.c:284
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2606
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2655

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2430 of file walsender.c.

2431{
2432 char msgtype;
2433
2434 /*
2435 * Check message type from the first byte.
2436 */
2438
2439 switch (msgtype)
2440 {
2443 break;
2444
2447 break;
2448
2451 break;
2452
2453 default:
2456 errmsg("unexpected message type \"%c\"", msgtype)));
2457 proc_exit(0);
2458 }
2459}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2686
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2806
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2498

References COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2806 of file walsender.c.

2807{
2814 TimestampTz replyTime;
2815
2816 /*
2817 * This shouldn't happen because we don't support getting primary status
2818 * message from standby.
2819 */
2820 if (RecoveryInProgress())
2821 elog(ERROR, "the primary status is unavailable during recovery");
2822
2823 replyTime = pq_getmsgint64(&reply_message);
2824
2825 /*
2826 * Update shared state for this WalSender process based on reply data from
2827 * standby.
2828 */
2829 SpinLockAcquire(&walsnd->mutex);
2830 walsnd->replyTime = replyTime;
2831 SpinLockRelease(&walsnd->mutex);
2832
2833 /*
2834 * Consider transactions in the current database, as only these are the
2835 * ones replicated.
2836 */
2839
2840 /*
2841 * Update the oldest xid for standby transmission if an older prepared
2842 * transaction exists and is currently in commit phase.
2843 */
2847
2851 lsn = GetXLogWriteRecPtr();
2852
2853 elog(DEBUG2, "sending primary status");
2854
2855 /* construct the message... */
2862
2863 /* ... and send it wrapped in CopyData */
2865}
int64_t int64
Definition c.h:621
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2845
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:441
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2835
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:10140

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2498 of file walsender.c.

2499{
2501 flushPtr,
2502 applyPtr;
2503 bool replyRequested;
2504 TimeOffset writeLag,
2505 flushLag,
2506 applyLag;
2507 bool clearLagTimes;
2509 TimestampTz replyTime;
2510
2514
2515 /* the caller already consumed the msgtype byte */
2519 replyTime = pq_getmsgint64(&reply_message);
2521
2523 {
2524 char *replyTimeStr;
2525
2526 /* Copy because timestamptz_to_str returns a static buffer */
2528
2529 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2533 replyRequested ? " (reply requested)" : "",
2534 replyTimeStr);
2535
2537 }
2538
2539 /* See if we can compute the round-trip lag for these positions. */
2544
2545 /*
2546 * If the standby reports that it has fully replayed the WAL, and the
2547 * write/flush/apply positions remain unchanged across two consecutive
2548 * reply messages, forget the lag times measured when it last
2549 * wrote/flushed/applied a WAL record.
2550 *
2551 * The second message with unchanged positions typically results from
2552 * wal_receiver_status_interval expiring on the standby, so lag values are
2553 * usually cleared after that interval when there is no activity. This
2554 * avoids displaying stale lag data until more WAL traffic arrives.
2555 */
2559
2563
2564 /* Send a reply if the standby requested one. */
2565 if (replyRequested)
2567
2568 /*
2569 * Update shared state for this WalSender process based on reply data from
2570 * standby.
2571 */
2572 {
2574
2575 SpinLockAcquire(&walsnd->mutex);
2576 walsnd->write = writePtr;
2577 walsnd->flush = flushPtr;
2578 walsnd->apply = applyPtr;
2579 if (writeLag != -1 || clearLagTimes)
2580 walsnd->writeLag = writeLag;
2581 if (flushLag != -1 || clearLagTimes)
2582 walsnd->flushLag = flushLag;
2583 if (applyLag != -1 || clearLagTimes)
2584 walsnd->applyLag = applyLag;
2585 walsnd->replyTime = replyTime;
2586 SpinLockRelease(&walsnd->mutex);
2587 }
2588
2591
2592 /*
2593 * Advance our local xmin horizon when the client confirmed a flush.
2594 */
2596 {
2599 else
2601 }
2602}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1816
#define SlotIsLogical(slot)
Definition slot.h:288
void SyncRepReleaseWaiters(void)
Definition syncrep.c:487
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:190
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2465
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4326
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4445

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 504 of file walsender.c.

505{
506#define READ_REPLICATION_SLOT_COLS 3
507 ReplicationSlot *slot;
510 TupleDesc tupdesc;
512 bool nulls[READ_REPLICATION_SLOT_COLS];
513
515 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
516 TEXTOID, -1, 0);
517 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
518 TEXTOID, -1, 0);
519 /* TimeLineID is unsigned, so int4 is not wide enough. */
520 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
521 INT8OID, -1, 0);
522 TupleDescFinalize(tupdesc);
523
524 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
525
527 slot = SearchNamedReplicationSlot(cmd->slotname, false);
528 if (slot == NULL || !slot->in_use)
529 {
531 }
532 else
533 {
535 int i = 0;
536
537 /* Copy slot contents while holding spinlock */
538 SpinLockAcquire(&slot->mutex);
539 slot_contents = *slot;
540 SpinLockRelease(&slot->mutex);
542
543 if (OidIsValid(slot_contents.data.database))
546 errmsg("cannot use %s with a logical replication slot",
547 "READ_REPLICATION_SLOT"));
548
549 /* slot type */
550 values[i] = CStringGetTextDatum("physical");
551 nulls[i] = false;
552 i++;
553
554 /* start LSN */
555 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
556 {
557 char xloc[64];
558
559 snprintf(xloc, sizeof(xloc), "%X/%08X",
560 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
562 nulls[i] = false;
563 }
564 i++;
565
566 /* timeline this WAL was produced on */
567 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
568 {
572
573 /*
574 * While in recovery, use as timeline the currently-replaying one
575 * to get the LSN position's history.
576 */
577 if (RecoveryInProgress())
579 else
581
586 nulls[i] = false;
587 }
588 i++;
589
591 }
592
595 do_tup_output(tstate, values, nulls);
597}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:77
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:545
#define OidIsValid(objectId)
Definition c.h:858
@ LW_SHARED
Definition lwlock.h:105
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg, ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), tliOfPointInHistory(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 604 of file walsender.c.

605{
607 TupleDesc tupdesc;
610 char path[MAXPGPATH];
611 int fd;
614 Size len;
615
617
618 /*
619 * Reply with a result set with one row, and two columns. The first col is
620 * the name of the history file, 2nd is the contents.
621 */
622 tupdesc = CreateTemplateTupleDesc(2);
623 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
624 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
625 TupleDescFinalize(tupdesc);
626
628 TLHistoryFilePath(path, cmd->timeline);
629
630 /* Send a RowDescription message */
631 dest->rStartup(dest, CMD_SELECT, tupdesc);
632
633 /* Send a DataRow message */
635 pq_sendint16(&buf, 2); /* # of columns */
637 pq_sendint32(&buf, len); /* col1 len */
639
641 if (fd < 0)
644 errmsg("could not open file \"%s\": %m", path)));
645
646 /* Determine file length and send it to client */
648 if (histfilelen < 0)
651 errmsg("could not seek to end of file \"%s\": %m", path)));
652 if (lseek(fd, 0, SEEK_SET) != 0)
655 errmsg("could not seek to beginning of file \"%s\": %m", path)));
656
657 pq_sendint32(&buf, histfilelen); /* col2 len */
658
660 while (bytesleft > 0)
661 {
663 int nread;
664
666 nread = read(fd, rbuf.data, sizeof(rbuf));
668 if (nread < 0)
671 errmsg("could not read file \"%s\": %m",
672 path)));
673 else if (nread == 0)
676 errmsg("could not read file \"%s\": read %d of %zu",
677 path, nread, (Size) bytesleft)));
678
679 pq_sendbytes(&buf, rbuf.data, nread);
680 bytesleft -= nread;
681 }
682
683 if (CloseTransientFile(fd) != 0)
686 errmsg("could not close file \"%s\": %m", path)));
687
689}
#define PG_BINARY
Definition c.h:1374
size_t Size
Definition c.h:689
int errcode_for_file_access(void)
Definition elog.c:897
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:275
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), TupleDescFinalize(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1485 of file walsender.c.

1486{
1488 QueryCompletion qc;
1489
1490 /* make sure that our requirements are still fulfilled */
1492
1494
1495 ReplicationSlotAcquire(cmd->slotname, true, true);
1496
1497 /*
1498 * Force a disconnect, so that the decoding code doesn't need to care
1499 * about an eventual switch from running in recovery, to running in a
1500 * normal environment. Client code is expected to handle reconnects.
1501 */
1503 {
1504 ereport(LOG,
1505 (errmsg("terminating walsender process after promotion")));
1506 got_STOPPING = true;
1507 }
1508
1509 /*
1510 * Create our decoding context, making it start at the previously ack'ed
1511 * position.
1512 *
1513 * Do this before sending a CopyBothResponse message, so that any errors
1514 * are reported early.
1515 */
1517 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1519 .segment_open = WalSndSegmentOpen,
1520 .segment_close = wal_segment_close),
1524
1526
1527 /* Send a CopyBothResponse message, and start streaming */
1529 pq_sendbyte(&buf, 0);
1530 pq_sendint16(&buf, 0);
1532 pq_flush();
1533
1534 /* Start reading WAL from the oldest required WAL. */
1537
1538 /*
1539 * Report the location after which we'll send out further commits as the
1540 * current sentPtr.
1541 */
1543
1544 /* Also update the sent position status in shared memory */
1548
1549 replication_active = true;
1550
1552
1553 /* Main loop of walsender */
1555
1558
1559 replication_active = false;
1560 if (got_STOPPING)
1561 proc_exit(0);
1563
1564 /* Get out of COPY mode (CommandComplete). */
1566 EndCommand(&qc, DestRemote, false);
1567}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:169
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:49
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:494
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
void SyncRepInitConfig(void)
Definition syncrep.c:458
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:3001
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:236
static void XLogSendLogical(void)
Definition walsender.c:3625
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg, fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 837 of file walsender.c.

838{
842
843 /* create xlogreader for physical replication */
844 xlogreader =
846 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
847 .segment_close = wal_segment_close),
848 NULL);
849
850 if (!xlogreader)
853 errmsg("out of memory"),
854 errdetail("Failed while allocating a WAL reading processor.")));
855
856 /*
857 * We assume here that we're logging enough information in the WAL for
858 * log-shipping, since this is checked in PostmasterMain().
859 *
860 * NOTE: wal_level can only change at shutdown, so in most cases it is
861 * difficult for there to be WAL data that we can still see that was
862 * written at wal_level='minimal'.
863 */
864
865 if (cmd->slotname)
866 {
867 ReplicationSlotAcquire(cmd->slotname, true, true);
871 errmsg("cannot use a logical replication slot for physical replication")));
872
873 /*
874 * We don't need to verify the slot's restart_lsn here; instead we
875 * rely on the caller requesting the starting point to use. If the
876 * WAL segment doesn't exist, we'll fail later.
877 */
878 }
879
880 /*
881 * Select the timeline. If it was given explicitly by the client, use
882 * that. Otherwise use the timeline of the last replayed record.
883 */
887 else
889
890 if (cmd->timeline != 0)
891 {
893
894 sendTimeLine = cmd->timeline;
895 if (sendTimeLine == FlushTLI)
896 {
899 }
900 else
901 {
903
905
906 /*
907 * Check that the timeline the client requested exists, and the
908 * requested start location is on that timeline.
909 */
914
915 /*
916 * Found the requested timeline in the history. Check that
917 * requested startpoint is on that timeline in our history.
918 *
919 * This is quite loose on purpose. We only check that we didn't
920 * fork off the requested timeline before the switchpoint. We
921 * don't check that we switched *to* it before the requested
922 * starting point. This is because the client can legitimately
923 * request to start replication from the beginning of the WAL
924 * segment that contains switchpoint, but on the new timeline, so
925 * that it doesn't end up with a partial segment. If you ask for
926 * too old a starting point, you'll get an error later when we
927 * fail to find the requested WAL segment in pg_wal.
928 *
929 * XXX: we could be more strict here and only allow a startpoint
930 * that's older than the switchpoint, if it's still in the same
931 * WAL segment.
932 */
934 switchpoint < cmd->startpoint)
935 {
937 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
939 cmd->timeline),
940 errdetail("This server's history forked from timeline %u at %X/%08X.",
941 cmd->timeline,
943 }
945 }
946 }
947 else
948 {
952 }
953
955
956 /* If there is nothing to stream, don't even enter COPY mode */
958 {
959 /*
960 * When we first start replication the standby will be behind the
961 * primary. For some applications, for example synchronous
962 * replication, it is important to have a clear state for this initial
963 * catchup mode, so we can trigger actions when we change streaming
964 * state later. We may stay in this state for a long time, which is
965 * exactly why we want to be able to monitor whether or not we are
966 * still here.
967 */
969
970 /* Send a CopyBothResponse message, and start streaming */
972 pq_sendbyte(&buf, 0);
973 pq_sendint16(&buf, 0);
975 pq_flush();
976
977 /*
978 * Don't allow a request to stream from a future point in WAL that
979 * hasn't been flushed to disk in this server yet.
980 */
981 if (FlushPtr < cmd->startpoint)
982 {
984 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
987 }
988
989 /* Start streaming from the requested point */
990 sentPtr = cmd->startpoint;
991
992 /* Initialize shared memory status, too */
996
998
999 /* Main loop of walsender */
1000 replication_active = true;
1001
1003
1004 replication_active = false;
1005 if (got_STOPPING)
1006 proc_exit(0);
1008
1010 }
1011
1012 if (cmd->slotname)
1014
1015 /*
1016 * Copy is finished now. Send a single-row result set indicating the next
1017 * timeline.
1018 */
1020 {
1021 char startpos_str[8 + 1 + 8 + 1];
1024 TupleDesc tupdesc;
1025 Datum values[2];
1026 bool nulls[2] = {0};
1027
1028 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1030
1032
1033 /*
1034 * Need a tuple descriptor representing two columns. int8 may seem
1035 * like a surprising data type for this, but in theory int4 would not
1036 * be wide enough for this, as TimeLineID is unsigned.
1037 */
1038 tupdesc = CreateTemplateTupleDesc(2);
1039 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1040 INT8OID, -1, 0);
1041 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1042 TEXTOID, -1, 0);
1043 TupleDescFinalize(tupdesc);
1044
1045 /* prepare for projection of tuple */
1047
1050
1051 /* send it to dest */
1052 do_tup_output(tstate, values, nulls);
1053
1055 }
1056
1057 /* Send CommandComplete message */
1058 EndReplicationCommand("START_STREAMING");
1059}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:573
int errdetail(const char *fmt,...) pg_attribute_printf(1
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3315
int wal_segment_size
Definition xlog.c:150
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:108

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2655 of file walsender.c.

2656{
2658 TransactionId nextXid;
2660
2664
2665 if (xid <= nextXid)
2666 {
2667 if (epoch != nextEpoch)
2668 return false;
2669 }
2670 else
2671 {
2672 if (epoch + 1 != nextEpoch)
2673 return false;
2674 }
2675
2676 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2677 return false; /* epoch OK, but it's wrapped around */
2678
2679 return true;
2680}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 695 of file walsender.c.

696{
697 MemoryContext mcxt;
699 off_t offset = 0;
701
702 /*
703 * parsing the manifest will use the cryptohash stuff, which requires a
704 * resource owner
705 */
710
711 /* Prepare to read manifest data into a temporary context. */
713 "incremental backup information",
716
717 /* Send a CopyInResponse message */
719 pq_sendbyte(&buf, 0);
720 pq_sendint16(&buf, 0);
722 pq_flush();
723
724 /* Receive packets from client until done. */
725 while (HandleUploadManifestPacket(&buf, &offset, ib))
726 ;
727
728 /* Finish up manifest processing. */
730
731 /*
732 * Discard any old manifest information and arrange to preserve the new
733 * information we just got.
734 *
735 * We assume that MemoryContextDelete and MemoryContextSetParent won't
736 * fail, and thus we shouldn't end up bailing out of here in such a way as
737 * to leave dangling pointers.
738 */
744
745 /* clean up the resource owner we created */
747}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext CacheMemoryContext
Definition mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:761
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:173

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckShutdownTimeout()

static void WalSndCheckShutdownTimeout ( void  )
static

Definition at line 2965 of file walsender.c.

2966{
2968
2969 /* Do nothing if shutdown has not been requested yet */
2970 if (!(got_STOPPING || got_SIGUSR2))
2971 return;
2972
2973 /* Terminate immediately if the timeout is set to 0 */
2976
2977 /*
2978 * Record the shutdown request timestamp even if
2979 * wal_sender_shutdown_timeout is disabled (-1), since the setting may
2980 * change during shutdown and the timestamp will be needed in that case.
2981 */
2983 {
2985 return;
2986 }
2987
2988 /* Do not check the timeout if it's disabled */
2990 return;
2991
2992 /* Terminate immediately if the timeout expires */
2997}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:225
int wal_sender_shutdown_timeout
Definition walsender.c:146
static TimestampTz shutdown_request_timestamp
Definition walsender.c:210
static pg_noreturn void WalSndDoneImmediate(void)
Definition walsender.c:3712

References GetCurrentTimestamp(), got_SIGUSR2, got_STOPPING, now(), shutdown_request_timestamp, TimestampDifferenceExceeds(), wal_sender_shutdown_timeout, and WalSndDoneImmediate().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2935 of file walsender.c.

2936{
2938
2939 /* don't bail out if we're doing something that doesn't require timeouts */
2940 if (last_reply_timestamp <= 0)
2941 return;
2942
2945
2947 {
2948 /*
2949 * Since typically expiration of replication timeout means
2950 * communication problem, we don't send the error message to the
2951 * standby.
2952 */
2954 (errmsg("terminating walsender process due to replication timeout")));
2955
2957 }
2958}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:143

References COMMERROR, ereport, errmsg, fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2878 of file walsender.c.

2879{
2881 long sleeptime = 10000; /* 10 s */
2882
2884 {
2885 /*
2886 * At the latest stop sleeping once wal_sender_timeout has been
2887 * reached.
2888 */
2891
2892 /*
2893 * If no ping has been sent yet, wakeup when it's time to do so.
2894 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2895 * the timeout passed without a response.
2896 */
2899 wal_sender_timeout / 2);
2900
2901 /* Compute relative time until wakeup. */
2903 }
2904
2906 {
2907 long shutdown_sleeptime;
2908
2911
2913
2914 /* Choose the earliest wakeup. */
2917 }
2918
2919 return sleeptime;
2920}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751

References fb(), last_reply_timestamp, now(), shutdown_request_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_shutdown_timeout, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3761 of file walsender.c.

3762{
3764
3765 /* ... let's just be real sure we're caught up ... */
3766 send_data();
3767
3768 /*
3769 * To figure out whether all WAL has successfully been replicated, check
3770 * flush location if valid, write otherwise. Tools like pg_receivewal will
3771 * usually (unless in synchronous mode) return an invalid flush location.
3772 */
3775
3778 {
3779 QueryCompletion qc;
3780
3781 /* Inform the standby that XLOG streaming is done */
3783 EndCommand(&qc, DestRemote, false);
3784 pq_flush();
3785
3786 proc_exit(0);
3787 }
3790}
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:222

References DestRemote, EndCommand(), fb(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndDoneImmediate()

static void WalSndDoneImmediate ( void  )
static

Definition at line 3712 of file walsender.c.

3713{
3715
3716 if (state == WALSNDSTATE_CATCHUP ||
3719 {
3720 QueryCompletion qc;
3721
3722 /* Try to inform receiver that XLOG streaming is done */
3724 EndCommand(&qc, DestRemote, false);
3725
3726 /*
3727 * Note that the output buffer may be full during the forced shutdown
3728 * of walsender. If pq_flush() is called at that time, the walsender
3729 * process will be stuck. Therefore, call pq_flush_if_writable()
3730 * instead. Successful reception of the done message with the
3731 * walsender forced into a shutdown is not guaranteed.
3732 */
3734 }
3735
3736 /*
3737 * Prevent ereport from attempting to send any more messages to the
3738 * standby. Otherwise, it can cause the process to get stuck if the output
3739 * buffers are full.
3740 */
3743
3745 (errmsg("terminating walsender process due to replication shutdown timeout"),
3746 errdetail("Walsender process might have been terminated before all WAL data was replicated to the receiver.")));
3747
3748 proc_exit(0);
3749}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:97
@ WALSNDSTATE_STREAMING

References DestNone, DestRemote, EndCommand(), ereport, errdetail(), errmsg, fb(), MyWalSnd, pq_flush_if_writable, proc_exit(), SetQueryCompletion(), WalSnd::state, WALSNDSTATE_CATCHUP, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, WARNING, and whereToSendOutput.

Referenced by WalSndCheckShutdownTimeout().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 370 of file walsender.c.

371{
376
377 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
379
380 if (MyReplicationSlot != NULL)
382
384
385 replication_active = false;
386
387 /*
388 * If there is a transaction in progress, it will clean up our
389 * ResourceOwner, but if a replication command set up a resource owner
390 * without a transaction, we've got to clean that up now.
391 */
394
396 proc_exit(0);
397
398 /* Revert back to startup state */
400}
void pgaio_error_cleanup(void)
Definition aio.c:1175
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1866
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:868
WALOpenSegment seg
Definition xlogreader.h:271
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 4120 of file walsender.c.

4121{
4122 switch (state)
4123 {
4125 return "startup";
4126 case WALSNDSTATE_BACKUP:
4127 return "backup";
4129 return "catchup";
4131 return "streaming";
4133 return "stopping";
4134 }
4135 return "UNKNOWN";
4136}
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndHandleConfigReload()

static void WalSndHandleConfigReload ( void  )
static

Definition at line 1650 of file walsender.c.

1651{
1653 return;
1654
1655 ConfigReloadPending = false;
1658
1659 /*
1660 * Recheck and release any now-satisfied waiters after config reload
1661 * changes synchronous replication requirements (e.g., reducing the number
1662 * of sync standbys or changing the standby names).
1663 */
1666}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27

References am_cascading_walsender, ConfigReloadPending, PGC_SIGHUP, ProcessConfigFile(), SyncRepInitConfig(), and SyncRepReleaseWaiters().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 4037 of file walsender.c.

4038{
4039 int i;
4040
4041 for (i = 0; i < max_wal_senders; i++)
4042 {
4044 pid_t pid;
4045
4046 SpinLockAcquire(&walsnd->mutex);
4047 pid = walsnd->pid;
4048 SpinLockRelease(&walsnd->mutex);
4049
4050 if (pid == 0)
4051 continue;
4052
4054 }
4055}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4349 of file walsender.c.

4350{
4352
4353 /*
4354 * Don't send keepalive messages if timeouts are globally disabled or
4355 * we're doing something not partaking in timeouts.
4356 */
4358 return;
4359
4361 return;
4362
4363 /*
4364 * If half of wal_sender_timeout has lapsed without receiving any reply
4365 * from the standby, send a keep-alive message to the standby requesting
4366 * an immediate reply.
4367 */
4369 wal_sender_timeout / 2);
4371 {
4373
4374 /* Try to flush pending output to the client */
4375 if (pq_flush_if_writable() != 0)
4377 }
4378}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3221 of file walsender.c.

3222{
3224
3225 Assert(walsnd != NULL);
3226
3227 MyWalSnd = NULL;
3228
3229 SpinLockAcquire(&walsnd->mutex);
3230 /* Mark WalSnd struct as no longer being in use. */
3231 walsnd->pid = 0;
3232 SpinLockRelease(&walsnd->mutex);
3233}

References Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3884 of file walsender.c.

3885{
3886 got_SIGUSR2 = true;
3888}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 3001 of file walsender.c.

3002{
3004
3005 /*
3006 * Initialize the last reply timestamp. That enables timeout processing
3007 * from hereon.
3008 */
3011
3012 /*
3013 * Loop until we reach the end of this timeline or the client requests to
3014 * stop streaming.
3015 */
3016 for (;;)
3017 {
3018 /* Clear any already-pending wakeups */
3020
3022
3023 /* Process any requests or signals received recently */
3025
3026 /* Check for input from the client */
3028
3029 /*
3030 * If we have received CopyDone from the client, sent CopyDone
3031 * ourselves, and the output buffer is empty, it's time to exit
3032 * streaming.
3033 */
3036 break;
3037
3038 /*
3039 * If we don't have any pending data in the output buffer, try to send
3040 * some more. If there is some, we don't bother to call send_data
3041 * again until we've flushed it ... but we'd better assume we are not
3042 * caught up.
3043 */
3044 if (!pq_is_send_pending())
3045 send_data();
3046 else
3047 WalSndCaughtUp = false;
3048
3049 /* Try to flush pending output to the client */
3050 if (pq_flush_if_writable() != 0)
3052
3053 /* If nothing remains to be sent right now ... */
3055 {
3056 /*
3057 * If we're in catchup state, move to streaming. This is an
3058 * important state change for users to know about, since before
3059 * this point data loss might occur if the primary dies and we
3060 * need to failover to the standby. The state change is also
3061 * important for synchronous replication, since commits that
3062 * started to wait at that point might wait for some time.
3063 */
3065 {
3067 (errmsg_internal("\"%s\" has now caught up with upstream server",
3070 }
3071
3072 /*
3073 * When SIGUSR2 arrives, we send any outstanding logs up to the
3074 * shutdown checkpoint record (i.e., the latest record), wait for
3075 * them to be replicated to the standby, and exit. This may be a
3076 * normal termination at shutdown, or a promotion, the walsender
3077 * is not sure which.
3078 */
3079 if (got_SIGUSR2)
3081 }
3082
3083 /* Check for replication timeout. */
3085
3086 /*
3087 * During shutdown, die if the shutdown timeout expires. Call this
3088 * before WalSndComputeSleeptime() so the timeout is considered when
3089 * computing sleep time.
3090 */
3092
3093 /* Send keepalive if the time has come */
3095
3096 /*
3097 * Block if we have unsent data. XXX For logical replication, let
3098 * WalSndWaitForWal() handle any other blocking; idle receivers need
3099 * its additional actions. For physical replication, also block if
3100 * caught up; its send_data does not block.
3101 *
3102 * The IO statistics are reported in WalSndWaitForWal() for the
3103 * logical WAL senders.
3104 */
3108 {
3109 long sleeptime;
3110 int wakeEvents;
3112
3115 else
3116 wakeEvents = 0;
3117
3118 /*
3119 * Use fresh timestamp, not last_processing, to reduce the chance
3120 * of reaching wal_sender_timeout before sending a keepalive.
3121 */
3124
3125 if (pq_is_send_pending())
3127
3128 /* Report IO statistics, if needed */
3131 {
3132 pgstat_flush_io(false);
3134 last_flush = now;
3135 }
3136
3137 /* Sleep until something happens or we time out */
3139 }
3140 }
3141}
char * application_name
Definition guc_tables.c:589
bool pgstat_flush_backend(bool nowait, uint32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:107
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3761

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1578 of file walsender.c.

1579{
1580 /* can't have sync rep confused by sending the same LSN several times */
1581 if (!last_write)
1582 lsn = InvalidXLogRecPtr;
1583
1584 resetStringInfo(ctx->out);
1585
1587 pq_sendint64(ctx->out, lsn); /* dataStart */
1588 pq_sendint64(ctx->out, lsn); /* walEnd */
1589
1590 /*
1591 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1592 * reserve space here.
1593 */
1594 pq_sendint64(ctx->out, 0); /* sendtime */
1595}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3837 of file walsender.c.

3838{
3839 int i;
3840
3841 for (i = 0; i < max_wal_senders; i++)
3842 {
3844
3845 SpinLockAcquire(&walsnd->mutex);
3846 if (walsnd->pid == 0)
3847 {
3848 SpinLockRelease(&walsnd->mutex);
3849 continue;
3850 }
3851 walsnd->needreload = true;
3852 SpinLockRelease(&walsnd->mutex);
3853 }
3854}

References fb(), i, max_wal_senders, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3237 of file walsender.c.

3239{
3240 char path[MAXPGPATH];
3241
3242 /*-------
3243 * When reading from a historic timeline, and there is a timeline switch
3244 * within this segment, read from the WAL segment belonging to the new
3245 * timeline.
3246 *
3247 * For example, imagine that this server is currently on timeline 5, and
3248 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3249 * 0/13002088. In pg_wal, we have these files:
3250 *
3251 * ...
3252 * 000000040000000000000012
3253 * 000000040000000000000013
3254 * 000000050000000000000013
3255 * 000000050000000000000014
3256 * ...
3257 *
3258 * In this situation, when requested to send the WAL from segment 0x13, on
3259 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3260 * recovery prefers files from newer timelines, so if the segment was
3261 * restored from the archive on this server, the file belonging to the old
3262 * timeline, 000000040000000000000013, might not exist. Their contents are
3263 * equal up to the switchpoint, because at a timeline switch, the used
3264 * portion of the old segment is copied to the new file.
3265 */
3268 {
3270
3271 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3272 if (nextSegNo == endSegNo)
3274 }
3275
3276 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3277 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3278 if (state->seg.ws_file >= 0)
3279 return;
3280
3281 /*
3282 * If the file is not found, assume it's because the standby asked for a
3283 * too old WAL segment that has already been removed or recycled.
3284 */
3285 if (errno == ENOENT)
3286 {
3287 char xlogfname[MAXFNAMELEN];
3288 int save_errno = errno;
3289
3291 errno = save_errno;
3292 ereport(ERROR,
3294 errmsg("requested WAL segment %s has already been removed",
3295 xlogfname)));
3296 }
3297 else
3298 ereport(ERROR,
3300 errmsg("could not open file \"%s\": %m",
3301 path)));
3302}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1090
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 4101 of file walsender.c.

4102{
4104
4106
4107 if (walsnd->state == state)
4108 return;
4109
4110 SpinLockAcquire(&walsnd->mutex);
4111 walsnd->state = state;
4112 SpinLockRelease(&walsnd->mutex);
4113}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

static void WalSndShmemInit ( void arg)
static

Definition at line 3925 of file walsender.c.

3926{
3927 for (int i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3929
3930 for (int i = 0; i < max_wal_senders; i++)
3931 {
3933
3934 SpinLockInit(&walsnd->mutex);
3935 }
3936
3940}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, NUM_SYNC_REP_WAIT_MODE, SpinLockInit(), WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WalSndCtlData::walsnds.

◆ WalSndShmemRequest()

static void WalSndShmemRequest ( void arg)
static

Definition at line 3911 of file walsender.c.

3912{
3913 Size size;
3914
3915 size = offsetof(WalSndCtlData, walsnds);
3916 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3917 ShmemRequestStruct(.name = "Wal Sender Ctl",
3918 .size = size,
3919 .ptr = (void **) &WalSndCtl,
3920 );
3921}
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References add_size(), fb(), max_wal_senders, mul_size(), name, ShmemRequestStruct, and WalSndCtl.

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 406 of file walsender.c.

407{
408 /*
409 * Reset whereToSendOutput to prevent ereport from attempting to send any
410 * more messages to the standby.
411 */
414
415 proc_exit(0);
416}

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3892 of file walsender.c.

3893{
3894 /* Set up signal handlers */
3896 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3897 pqsignal(SIGTERM, die); /* request shutdown */
3898 /* SIGQUIT handler was already set up by InitPostmasterChild */
3899 InitializeTimeouts(); /* establishes SIGALRM handler */
3902 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3903 * shutdown */
3904
3905 /* Reset some signals that are accepted by postmaster but not here */
3907}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:547
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3053
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3884
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1729 of file walsender.c.

1731{
1732 static TimestampTz sendTime = 0;
1734 bool pending_writes = false;
1735 bool end_xact = ctx->end_xact;
1736
1737 /*
1738 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1739 * avoid flooding the lag tracker when we commit frequently.
1740 *
1741 * We don't have a mechanism to get the ack for any LSN other than end
1742 * xact LSN from the downstream. So, we track lag only for end of
1743 * transaction LSN.
1744 */
1745#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1746 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1748 {
1749 LagTrackerWrite(lsn, now);
1750 sendTime = now;
1751 }
1752
1753 /*
1754 * When skipping empty transactions in synchronous replication, we send a
1755 * keepalive message to avoid delaying such transactions.
1756 *
1757 * It is okay to check sync_standbys_status without lock here as in the
1758 * worst case we will just send an extra keepalive message when it is
1759 * really not required.
1760 */
1761 if (skipped_xact &&
1762 SyncRepRequested() &&
1763 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1764 {
1765 WalSndKeepalive(false, lsn);
1766
1767 /* Try to flush pending output to the client */
1768 if (pq_flush_if_writable() != 0)
1770
1771 /* If we have pending write here, make sure it's actually flushed */
1772 if (pq_is_send_pending())
1773 pending_writes = true;
1774 }
1775
1776 /*
1777 * Process pending writes if any or try to send a keepalive if required.
1778 * We don't need to try sending keep alive messages at the transaction end
1779 * as that will be done at a later point in time. This is required only
1780 * for large transactions where we don't send any changes to the
1781 * downstream and the receiver can timeout due to that.
1782 */
1783 if (pending_writes || (!end_xact &&
1785 wal_sender_timeout / 2)))
1787}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1673
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4387
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3979 of file walsender.c.

3980{
3981 WaitEvent event;
3982
3984
3985 /*
3986 * We use a condition variable to efficiently wake up walsenders in
3987 * WalSndWakeup().
3988 *
3989 * Every walsender prepares to sleep on a shared memory CV. Note that it
3990 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3991 * waitlist), but does not actually wait on the CV (IOW, it never calls
3992 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3993 * waiting, because we also need to wait for socket events. The processes
3994 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3995 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3996 * walsenders come out of WaitEventSetWait().
3997 *
3998 * This approach is simple and efficient because, one doesn't have to loop
3999 * through all the walsenders slots, with a spinlock acquisition and
4000 * release for every iteration, just to wake up only the waiting
4001 * walsenders. It makes WalSndWakeup() callers' life easy.
4002 *
4003 * XXX: A desirable future improvement would be to add support for CVs
4004 * into WaitEventSetWait().
4005 *
4006 * And, we use separate shared memory CVs for physical and logical
4007 * walsenders for selective wake ups, see WalSndWakeup() for more details.
4008 *
4009 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
4010 * until awakened by physical walsenders after the walreceiver confirms
4011 * the receipt of the LSN.
4012 */
4019
4020 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
4021 (event.events & WL_POSTMASTER_DEATH))
4022 {
4024 proc_exit(1);
4025 }
4026
4028}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:66
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:167
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1879 of file walsender.c.

1880{
1881 int wakeEvents;
1882 uint32 wait_event = 0;
1885
1886 /*
1887 * Fast path to avoid acquiring the spinlock in case we already know we
1888 * have enough WAL available and all the standby servers have confirmed
1889 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1890 * if we're far behind.
1891 */
1894 return RecentFlushPtr;
1895
1896 /*
1897 * Within the loop, we wait for the necessary WALs to be flushed to disk
1898 * first, followed by waiting for standbys to catch up if there are enough
1899 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1900 */
1901 for (;;)
1902 {
1903 bool wait_for_standby_at_stop = false;
1904 long sleeptime;
1906
1907 /* Clear any already-pending wakeups */
1909
1911
1912 /* Process any requests or signals received recently */
1914
1915 /* Check for input from the client */
1917
1918 /*
1919 * If we're shutting down, trigger pending WAL to be written out,
1920 * otherwise we'd possibly end up waiting for WAL that never gets
1921 * written, because walwriter has shut down already.
1922 *
1923 * Note that GetXLogInsertEndRecPtr() is used to obtain the WAL flush
1924 * request location instead of GetXLogInsertRecPtr(). Because if the
1925 * last WAL record ends at a page boundary, GetXLogInsertRecPtr() can
1926 * return an LSN pointing past the page header, which may cause
1927 * XLogFlush() to report an error.
1928 */
1931
1932 /*
1933 * To avoid the scenario where standbys need to catch up to a newer
1934 * WAL location in each iteration, we update our idea of the currently
1935 * flushed position only if we are not waiting for standbys to catch
1936 * up.
1937 */
1939 {
1940 if (!RecoveryInProgress())
1942 else
1944 }
1945
1946 /*
1947 * If postmaster asked us to stop and the standby slots have caught up
1948 * to the flushed position, don't wait anymore.
1949 *
1950 * It's important to do this check after the recomputation of
1951 * RecentFlushPtr, so we can send all remaining data before shutting
1952 * down.
1953 */
1954 if (got_STOPPING)
1955 {
1958 else
1959 break;
1960 }
1961
1962 /*
1963 * We only send regular messages to the client for full decoded
1964 * transactions, but a synchronous replication and walsender shutdown
1965 * possibly are waiting for a later location. So, before sleeping, we
1966 * send a ping containing the flush location. If the receiver is
1967 * otherwise idle, this keepalive will trigger a reply. Processing the
1968 * reply will update these MyWalSnd locations.
1969 */
1970 if (MyWalSnd->flush < sentPtr &&
1971 MyWalSnd->write < sentPtr &&
1974
1975 /*
1976 * Exit the loop if already caught up and doesn't need to wait for
1977 * standby slots.
1978 */
1981 break;
1982
1983 /*
1984 * Waiting for new WAL or waiting for standbys to catch up. Since we
1985 * need to wait, we're now caught up.
1986 */
1987 WalSndCaughtUp = true;
1988
1989 /*
1990 * Try to flush any pending output to the client.
1991 */
1992 if (pq_flush_if_writable() != 0)
1994
1995 /*
1996 * If we have received CopyDone from the client, sent CopyDone
1997 * ourselves, and the output buffer is empty, it's time to exit
1998 * streaming, so fail the current WAL fetch request.
1999 */
2002 break;
2003
2004 /* die if timeout was reached */
2006
2007 /*
2008 * During shutdown, die if the shutdown timeout expires. Call this
2009 * before WalSndComputeSleeptime() so the timeout is considered when
2010 * computing sleep time.
2011 */
2013
2014 /* Send keepalive if the time has come */
2016
2017 /*
2018 * Sleep until something happens or we time out. Also wait for the
2019 * socket becoming writable, if there's still pending output.
2020 * Otherwise we might sit on sendable output data while waiting for
2021 * new WAL to be generated. (But if we have nothing to send, we don't
2022 * want to wake on socket-writable.)
2023 */
2026
2028
2029 if (pq_is_send_pending())
2031
2032 Assert(wait_event != 0);
2033
2034 /* Report IO statistics, if needed */
2037 {
2038 pgstat_flush_io(false);
2040 last_flush = now;
2041 }
2042
2044 }
2045
2046 /* reactivate latch so WalSndLoop knows to continue */
2048 return RecentFlushPtr;
2049}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1851
XLogRecPtr GetXLogInsertEndRecPtr(void)
Definition xlog.c:10124
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801

References Assert, CHECK_FOR_INTERRUPTS, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogInsertEndRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 4063 of file walsender.c.

4064{
4065 for (;;)
4066 {
4067 int i;
4068 bool all_stopped = true;
4069
4070 for (i = 0; i < max_wal_senders; i++)
4071 {
4073
4074 SpinLockAcquire(&walsnd->mutex);
4075
4076 if (walsnd->pid == 0)
4077 {
4078 SpinLockRelease(&walsnd->mutex);
4079 continue;
4080 }
4081
4082 if (walsnd->state != WALSNDSTATE_STOPPING)
4083 {
4084 all_stopped = false;
4085 SpinLockRelease(&walsnd->mutex);
4086 break;
4087 }
4088 SpinLockRelease(&walsnd->mutex);
4089 }
4090
4091 /* safe to leave if confirmation is done for all WAL senders */
4092 if (all_stopped)
4093 return;
4094
4095 pg_usleep(10000L); /* wait for 10 msec */
4096 }
4097}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3958 of file walsender.c.

3959{
3960 /*
3961 * Wake up all the walsenders waiting on WAL being flushed or replayed
3962 * respectively. Note that waiting walsender would have prepared to sleep
3963 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3964 * before actually waiting.
3965 */
3966 if (physical)
3968
3969 if (logical)
3971}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1605 of file walsender.c.

1607{
1609
1610 /*
1611 * Fill the send timestamp last, so that it is taken as late as possible.
1612 * This is somewhat ugly, but the protocol is set as it's already used for
1613 * several releases by streaming physical replication.
1614 */
1618 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1619 tmpbuf.data, sizeof(int64));
1620
1621 /* output previously gathered data in a CopyData packet */
1623
1625
1626 /* Try to flush pending output to the client */
1627 if (pq_flush_if_writable() != 0)
1629
1630 /* Try taking fast path unless we get too close to walsender timeout. */
1632 wal_sender_timeout / 2) &&
1634 {
1635 return;
1636 }
1637
1638 /* If we have pending write here, go to slow path */
1640}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, memcpy(), now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3625 of file walsender.c.

3626{
3627 XLogRecord *record;
3628 char *errm;
3629
3630 /*
3631 * We'll use the current flush point to determine whether we've caught up.
3632 * This variable is static in order to cache it across calls. Caching is
3633 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3634 * spinlock.
3635 */
3637
3638 /*
3639 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3640 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3641 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3642 * didn't wait - i.e. when we're shutting down.
3643 */
3644 WalSndCaughtUp = false;
3645
3647
3648 /* xlog record was invalid */
3649 if (errm != NULL)
3650 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3651 errm);
3652
3653 if (record != NULL)
3654 {
3655 /*
3656 * Note the lack of any call to LagTrackerWrite() which is handled by
3657 * WalSndUpdateProgress which is called by output plugin through
3658 * logical decoding write api.
3659 */
3661
3663 }
3664
3665 /*
3666 * If first time through in this session, initialize flushPtr. Otherwise,
3667 * we only need to update flushPtr if EndRecPtr is past it.
3668 */
3671 {
3672 /*
3673 * For cascading logical WAL senders, we use the replay LSN instead of
3674 * the flush LSN, since logical decoding on a standby only processes
3675 * WAL that has been replayed. This distinction becomes particularly
3676 * important during shutdown, as new WAL is no longer replayed and the
3677 * last replayed LSN marks the furthest point up to which decoding can
3678 * proceed.
3679 */
3682 else
3684 }
3685
3686 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3688 WalSndCaughtUp = true;
3689
3690 /*
3691 * If we're caught up and have been requested to stop, have WalSndLoop()
3692 * terminate the connection in an orderly manner, after writing out all
3693 * the pending data.
3694 */
3696 got_SIGUSR2 = true;
3697
3698 /* Update shared memory status */
3699 {
3701
3702 SpinLockAcquire(&walsnd->mutex);
3703 walsnd->sentPtr = sentPtr;
3704 SpinLockRelease(&walsnd->mutex);
3705 }
3706}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:89
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire(), SpinLockRelease(), WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3315 of file walsender.c.

3316{
3318 XLogRecPtr startptr;
3319 XLogRecPtr endptr;
3320 Size nbytes;
3321 XLogSegNo segno;
3323 Size rbytes;
3324
3325 /* If requested switch the WAL sender to the stopping state. */
3326 if (got_STOPPING)
3328
3330 {
3331 WalSndCaughtUp = true;
3332 return;
3333 }
3334
3335 /* Figure out how far we can safely send the WAL. */
3337 {
3338 /*
3339 * Streaming an old timeline that's in this server's history, but is
3340 * not the one we're currently inserting or replaying. It can be
3341 * streamed up to the point where we switched off that timeline.
3342 */
3344 }
3345 else if (am_cascading_walsender)
3346 {
3348
3349 /*
3350 * Streaming the latest timeline on a standby.
3351 *
3352 * Attempt to send all WAL that has already been replayed, so that we
3353 * know it's valid. If we're receiving WAL through streaming
3354 * replication, it's also OK to send any WAL that has been received
3355 * but not replayed.
3356 *
3357 * The timeline we're recovering from can change, or we can be
3358 * promoted. In either case, the current timeline becomes historic. We
3359 * need to detect that so that we don't try to stream past the point
3360 * where we switched to another timeline. We check for promotion or
3361 * timeline switch after calculating FlushPtr, to avoid a race
3362 * condition: if the timeline becomes historic just after we checked
3363 * that it was still current, it's still be OK to stream it up to the
3364 * FlushPtr that was calculated before it became historic.
3365 */
3366 bool becameHistoric = false;
3367
3369
3370 if (!RecoveryInProgress())
3371 {
3372 /* We have been promoted. */
3374 am_cascading_walsender = false;
3375 becameHistoric = true;
3376 }
3377 else
3378 {
3379 /*
3380 * Still a cascading standby. But is the timeline we're sending
3381 * still the one recovery is recovering from?
3382 */
3384 becameHistoric = true;
3385 }
3386
3387 if (becameHistoric)
3388 {
3389 /*
3390 * The timeline we were sending has become historic. Read the
3391 * timeline history file of the new timeline to see where exactly
3392 * we forked off from the timeline we were sending.
3393 */
3394 List *history;
3395
3398
3401
3403
3405 }
3406 }
3407 else
3408 {
3409 /*
3410 * Streaming the current timeline on a primary.
3411 *
3412 * Attempt to send all data that's already been written out and
3413 * fsync'd to disk. We cannot go further than what's been written out
3414 * given the current implementation of WALRead(). And in any case
3415 * it's unsafe to send WAL that is not securely down to disk on the
3416 * primary: if the primary subsequently crashes and restarts, standbys
3417 * must not have applied any WAL that got lost on the primary.
3418 */
3420 }
3421
3422 /*
3423 * Record the current system time as an approximation of the time at which
3424 * this WAL location was written for the purposes of lag tracking.
3425 *
3426 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3427 * is flushed and we could get that time as well as the LSN when we call
3428 * GetFlushRecPtr() above (and likewise for the cascading standby
3429 * equivalent), but rather than putting any new code into the hot WAL path
3430 * it seems good enough to capture the time here. We should reach this
3431 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3432 * may take some time, we read the WAL flush pointer and take the time
3433 * very close to together here so that we'll get a later position if it is
3434 * still moving.
3435 *
3436 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3437 * this gives us a cheap approximation for the WAL flush time for this
3438 * LSN.
3439 *
3440 * Note that the LSN is not necessarily the LSN for the data contained in
3441 * the present message; it's the end of the WAL, which might be further
3442 * ahead. All the lag tracking machinery cares about is finding out when
3443 * that arbitrary LSN is eventually reported as written, flushed and
3444 * applied, so that it can measure the elapsed time.
3445 */
3447
3448 /*
3449 * If this is a historic timeline and we've reached the point where we
3450 * forked to the next timeline, stop streaming.
3451 *
3452 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3453 * startup process will normally replay all WAL that has been received
3454 * from the primary, before promoting, but if the WAL streaming is
3455 * terminated at a WAL page boundary, the valid portion of the timeline
3456 * might end in the middle of a WAL record. We might've already sent the
3457 * first half of that partial WAL record to the cascading standby, so that
3458 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3459 * replay the partial WAL record either, so it can still follow our
3460 * timeline switch.
3461 */
3463 {
3464 /* close the current file. */
3465 if (xlogreader->seg.ws_file >= 0)
3467
3468 /* Send CopyDone */
3470 streamingDoneSending = true;
3471
3472 WalSndCaughtUp = true;
3473
3474 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3477 return;
3478 }
3479
3480 /* Do we have any work to do? */
3482 if (SendRqstPtr <= sentPtr)
3483 {
3484 WalSndCaughtUp = true;
3485 return;
3486 }
3487
3488 /*
3489 * Figure out how much to send in one message. If there's no more than
3490 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3491 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3492 *
3493 * The rounding is not only for performance reasons. Walreceiver relies on
3494 * the fact that we never split a WAL record across two messages. Since a
3495 * long WAL record is split at page boundary into continuation records,
3496 * page boundary is always a safe cut-off point. We also assume that
3497 * SendRqstPtr never points to the middle of a WAL record.
3498 */
3499 startptr = sentPtr;
3500 endptr = startptr;
3501 endptr += MAX_SEND_SIZE;
3502
3503 /* if we went beyond SendRqstPtr, back off */
3504 if (SendRqstPtr <= endptr)
3505 {
3506 endptr = SendRqstPtr;
3508 WalSndCaughtUp = false;
3509 else
3510 WalSndCaughtUp = true;
3511 }
3512 else
3513 {
3514 /* round down to page boundary. */
3515 endptr -= (endptr % XLOG_BLCKSZ);
3516 WalSndCaughtUp = false;
3517 }
3518
3519 nbytes = endptr - startptr;
3520 Assert(nbytes <= MAX_SEND_SIZE);
3521
3522 /*
3523 * OK to read and send the slice.
3524 */
3527
3528 pq_sendint64(&output_message, startptr); /* dataStart */
3529 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3530 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3531
3532 /*
3533 * Read the log directly into the output buffer to avoid extra memcpy
3534 * calls.
3535 */
3537
3538retry:
3539 /* attempt to read WAL from WAL buffers first */
3541 startptr, nbytes, xlogreader->seg.ws_tli);
3543 startptr += rbytes;
3544 nbytes -= rbytes;
3545
3546 /* now read the remaining WAL from WAL file */
3547 if (nbytes > 0 &&
3550 startptr,
3551 nbytes,
3552 xlogreader->seg.ws_tli, /* Pass the current TLI because
3553 * only WalSndSegmentOpen controls
3554 * whether new TLI is needed. */
3555 &errinfo))
3557
3558 /* See logical_read_xlog_page(). */
3559 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3561
3562 /*
3563 * During recovery, the currently-open WAL file might be replaced with the
3564 * file of the same name retrieved from archive. So we always need to
3565 * check what we read was valid after reading into the buffer. If it's
3566 * invalid, we try to open and read the file again.
3567 */
3569 {
3571 bool reload;
3572
3573 SpinLockAcquire(&walsnd->mutex);
3574 reload = walsnd->needreload;
3575 walsnd->needreload = false;
3576 SpinLockRelease(&walsnd->mutex);
3577
3578 if (reload && xlogreader->seg.ws_file >= 0)
3579 {
3581
3582 goto retry;
3583 }
3584 }
3585
3586 output_message.len += nbytes;
3588
3589 /*
3590 * Fill the send timestamp last, so that it is taken as late as possible.
3591 */
3594 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3595 tmpbuf.data, sizeof(int64));
3596
3598
3599 sentPtr = endptr;
3600
3601 /* Update shared memory status */
3602 {
3604
3605 SpinLockAcquire(&walsnd->mutex);
3606 walsnd->sentPtr = sentPtr;
3607 SpinLockRelease(&walsnd->mutex);
3608 }
3609
3610 /* Report progress of XLOG streaming in PS display */
3612 {
3613 char activitymsg[50];
3614
3615 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3618 }
3619}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:118
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1789

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, memcpy(), MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 138 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 272 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 236 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ shutdown_request_timestamp

TimestampTz shutdown_request_timestamp = 0
static

Definition at line 210 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 219 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 172 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 173 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 155 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_shutdown_timeout

int wal_sender_shutdown_timeout = -1

Definition at line 146 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ WalSndShmemCallbacks

const ShmemCallbacks WalSndShmemCallbacks
Initial value:
= {
.request_fn = WalSndShmemRequest,
.init_fn = WalSndShmemInit,
}
static void WalSndShmemRequest(void *arg)
Definition walsender.c:3911
static void WalSndShmemInit(void *arg)
Definition walsender.c:3925

Definition at line 126 of file walsender.c.

126 {
127 .request_fn = WalSndShmemRequest,
128 .init_fn = WalSndShmemInit,
129};

◆ xlogreader