PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/subsystems.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndShmemRequest (void *arg)
 
static void WalSndShmemInit (void *arg)
 
static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static pg_noreturn void WalSndDoneImmediate (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndCheckShutdownTimeout (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void WalSndHandleConfigReload (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
const ShmemCallbacks WalSndShmemCallbacks
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
int wal_sender_shutdown_timeout = -1
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static TimestampTz shutdown_request_timestamp = 0
 
static bool shutdown_stream_done_queued = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 253 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 118 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 107 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 285 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1450 of file walsender.c.

1451{
1452 bool failover_given = false;
1453 bool two_phase_given = false;
1454 bool failover;
1455 bool two_phase;
1456
1457 /* Parse options */
1459 {
1460 if (strcmp(defel->defname, "failover") == 0)
1461 {
1462 if (failover_given)
1463 ereport(ERROR,
1465 errmsg("conflicting or redundant options")));
1466 failover_given = true;
1468 }
1469 else if (strcmp(defel->defname, "two_phase") == 0)
1470 {
1471 if (two_phase_given)
1472 ereport(ERROR,
1474 errmsg("conflicting or redundant options")));
1475 two_phase_given = true;
1477 }
1478 else
1479 elog(ERROR, "unrecognized option: %s", defel->defname);
1480 }
1481
1485}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:875
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
static char * errmsg
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:960

References defGetBoolean(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1227 of file walsender.c.

1228{
1229 const char *snapshot_name = NULL;
1230 char xloc[MAXFNAMELEN];
1231 char *slot_name;
1232 bool reserve_wal = false;
1233 bool two_phase = false;
1234 bool failover = false;
1238 TupleDesc tupdesc;
1239 Datum values[4];
1240 bool nulls[4] = {0};
1241
1243
1245 &failover);
1246
1247 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1248 {
1249 ReplicationSlotCreate(cmd->slotname, false,
1251 false, false, false, false);
1252
1253 if (reserve_wal)
1254 {
1256
1258
1259 /* Write this slot to disk if it's a permanent one. */
1260 if (!cmd->temporary)
1262 }
1263 }
1264 else
1265 {
1267 bool need_full_snapshot = false;
1268
1270
1272
1273 /*
1274 * Initially create persistent slot as ephemeral - that allows us to
1275 * nicely handle errors during initialization because it'll get
1276 * dropped if this transaction fails. We'll make it persistent at the
1277 * end. Temporary slots can be created as temporary from beginning as
1278 * they get dropped on error as well.
1279 */
1283
1284 /*
1285 * Do options check early so that we can bail before calling the
1286 * DecodingContextFindStartpoint which can take long time.
1287 */
1289 {
1290 if (IsTransactionBlock())
1291 ereport(ERROR,
1292 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1293 (errmsg("%s must not be called inside a transaction",
1294 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1295
1296 need_full_snapshot = true;
1297 }
1299 {
1300 if (!IsTransactionBlock())
1301 ereport(ERROR,
1302 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1303 (errmsg("%s must be called inside a transaction",
1304 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1305
1307 ereport(ERROR,
1308 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1309 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1310 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1311 if (!XactReadOnly)
1312 ereport(ERROR,
1313 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1314 (errmsg("%s must be called in a read-only transaction",
1315 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1316
1317 if (FirstSnapshotSet)
1318 ereport(ERROR,
1319 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1320 (errmsg("%s must be called before any query",
1321 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1322
1323 if (IsSubTransaction())
1324 ereport(ERROR,
1325 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1326 (errmsg("%s must not be called in a subtransaction",
1327 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1328
1329 need_full_snapshot = true;
1330 }
1331
1332 /*
1333 * Ensure the logical decoding is enabled before initializing the
1334 * logical decoding context.
1335 */
1338
1340 false,
1343 .segment_open = WalSndSegmentOpen,
1344 .segment_close = wal_segment_close),
1347
1348 /*
1349 * Signal that we don't need the timeout mechanism. We're just
1350 * creating the replication slot and don't yet accept feedback
1351 * messages or send keepalives. As we possibly need to wait for
1352 * further WAL the walsender would otherwise possibly be killed too
1353 * soon.
1354 */
1356
1357 /* build initial snapshot, might take a while */
1359
1360 /*
1361 * Export or use the snapshot if we've been asked to do so.
1362 *
1363 * NB. We will convert the snapbuild.c kind of snapshot to normal
1364 * snapshot when doing this.
1365 */
1367 {
1369 }
1371 {
1372 Snapshot snap;
1373
1376 }
1377
1378 /* don't need the decoding context anymore */
1380
1381 if (!cmd->temporary)
1383 }
1384
1385 snprintf(xloc, sizeof(xloc), "%X/%08X",
1387
1389
1390 /*----------
1391 * Need a tuple descriptor representing four columns:
1392 * - first field: the slot name
1393 * - second field: LSN at which we became consistent
1394 * - third field: exported snapshot's name
1395 * - fourth field: output plugin
1396 */
1397 tupdesc = CreateTemplateTupleDesc(4);
1398 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1399 TEXTOID, -1, 0);
1400 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1401 TEXTOID, -1, 0);
1402 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1403 TEXTOID, -1, 0);
1404 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1405 TEXTOID, -1, 0);
1406 TupleDescFinalize(tupdesc);
1407
1408 /* prepare for projection of tuples */
1410
1411 /* slot_name */
1412 slot_name = NameStr(MyReplicationSlot->data.name);
1413 values[0] = CStringGetTextDatum(slot_name);
1414
1415 /* consistent wal location */
1417
1418 /* snapshot name, or NULL if none */
1419 if (snapshot_name != NULL)
1421 else
1422 nulls[2] = true;
1423
1424 /* plugin, or NULL if none */
1425 if (cmd->plugin != NULL)
1427 else
1428 nulls[3] = true;
1429
1430 /* send it to dest */
1431 do_tup_output(tstate, values, nulls);
1433
1435}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define NameStr(name)
Definition c.h:835
#define Assert(condition)
Definition c.h:943
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:673
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:629
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:325
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:261
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition slot.c:1184
void ReplicationSlotReserveWal(void)
Definition slot.c:1711
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
void ReplicationSlotPersist(void)
Definition slot.c:1201
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
void ReplicationSlotSave(void)
Definition slot.c:1166
void ReplicationSlotRelease(void)
Definition slot.c:769
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:458
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:556
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:71
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:213
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescFinalize(TupleDesc tupdesc)
Definition tupdesc.c:511
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:976
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1150
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3244
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1612
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1736
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1077
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1585
static TimestampTz last_reply_timestamp
Definition walsender.c:204
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
bool IsSubTransaction(void)
Definition xact.c:5095
bool IsTransactionBlock(void)
Definition xact.c:5022
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg, ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1441 of file walsender.c.

1442{
1443 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1444}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:920

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 2065 of file walsender.c.

2066{
2067 yyscan_t scanner;
2068 int parse_rc;
2069 Node *cmd_node;
2070 const char *cmdtag;
2072
2073 /* We save and re-use the cmd_context across calls */
2075
2076 /*
2077 * If WAL sender has been told that shutdown is getting close, switch its
2078 * status accordingly to handle the next replication commands correctly.
2079 */
2080 if (got_STOPPING)
2082
2083 /*
2084 * Throw error if in stopping mode. We need prevent commands that could
2085 * generate WAL while the shutdown checkpoint is being written. To be
2086 * safe, we just prohibit all new commands.
2087 */
2089 ereport(ERROR,
2091 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2092
2093 /*
2094 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2095 * command arrives. Clean up the old stuff if there's anything.
2096 */
2098
2100
2101 /*
2102 * Prepare to parse and execute the command.
2103 *
2104 * Because replication command execution can involve beginning or ending
2105 * transactions, we need a working context that will survive that, so we
2106 * make it a child of TopMemoryContext. That in turn creates a hazard of
2107 * long-lived memory leaks if we lose track of the working context. We
2108 * deal with that by creating it only once per walsender, and resetting it
2109 * for each new command. (Normally this reset is a no-op, but if the
2110 * prior exec_replication_command call failed with an error, it won't be.)
2111 *
2112 * This is subtler than it looks. The transactions we manage can extend
2113 * across replication commands, indeed SnapBuildClearExportedSnapshot
2114 * might have just ended one. Because transaction exit will revert to the
2115 * memory context that was current at transaction start, we need to be
2116 * sure that that context is still valid. That motivates re-using the
2117 * same cmd_context rather than making a new one each time.
2118 */
2119 if (cmd_context == NULL)
2121 "Replication command context",
2123 else
2125
2127
2129
2130 /*
2131 * Is it a WalSender command?
2132 */
2134 {
2135 /* Nope; clean up and get out. */
2137
2140
2141 /* XXX this is a pretty random place to make this check */
2142 if (MyDatabaseId == InvalidOid)
2143 ereport(ERROR,
2145 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2146
2147 /* Tell the caller that this wasn't a WalSender command. */
2148 return false;
2149 }
2150
2151 /*
2152 * Looks like a WalSender command, so parse it.
2153 */
2155 if (parse_rc != 0)
2156 ereport(ERROR,
2158 errmsg_internal("replication command parser returned %d",
2159 parse_rc)));
2161
2162 /*
2163 * Report query to various monitoring facilities. For this purpose, we
2164 * report replication commands just like SQL commands.
2165 */
2167
2169
2170 /*
2171 * Log replication command if log_replication_commands is enabled. Even
2172 * when it's disabled, log the command with DEBUG1 level for backward
2173 * compatibility.
2174 */
2176 (errmsg("received replication command: %s", cmd_string)));
2177
2178 /*
2179 * Disallow replication commands in aborted transaction blocks.
2180 */
2182 ereport(ERROR,
2184 errmsg("current transaction is aborted, "
2185 "commands ignored until end of transaction block")));
2186
2188
2189 /*
2190 * Allocate buffers that will be used for each outgoing and incoming
2191 * message. We do this just once per command to reduce palloc overhead.
2192 */
2196
2197 switch (cmd_node->type)
2198 {
2200 cmdtag = "IDENTIFY_SYSTEM";
2204 break;
2205
2207 cmdtag = "READ_REPLICATION_SLOT";
2211 break;
2212
2213 case T_BaseBackupCmd:
2214 cmdtag = "BASE_BACKUP";
2219 break;
2220
2222 cmdtag = "CREATE_REPLICATION_SLOT";
2226 break;
2227
2229 cmdtag = "DROP_REPLICATION_SLOT";
2233 break;
2234
2236 cmdtag = "ALTER_REPLICATION_SLOT";
2240 break;
2241
2243 {
2245
2246 cmdtag = "START_REPLICATION";
2249
2250 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2251 StartReplication(cmd);
2252 else
2254
2255 /* dupe, but necessary per libpqrcv_endstreaming */
2257
2259 break;
2260 }
2261
2263 cmdtag = "TIMELINE_HISTORY";
2268 break;
2269
2270 case T_VariableShowStmt:
2271 {
2274
2275 cmdtag = "SHOW";
2277
2278 /* syscache access needs a transaction environment */
2280 GetPGVariable(n->name, dest);
2283 }
2284 break;
2285
2287 cmdtag = "UPLOAD_MANIFEST";
2292 break;
2293
2294 default:
2295 elog(ERROR, "unrecognized replication command node tag: %u",
2296 cmd_node->type);
2297 }
2298
2299 /*
2300 * Done. Revert to caller's memory context, and clean out the cmd_context
2301 * to recover memory right away.
2302 */
2305
2306 /*
2307 * We need not update ps display or pg_stat_activity, because PostgresMain
2308 * will reset those to "idle". But we must reset debug_query_string to
2309 * ensure it doesn't become a dangling pointer.
2310 */
2312
2313 return true;
2314}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:992
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:217
#define LOG
Definition elog.h:32
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
Oid MyDatabaseId
Definition globals.c:96
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:410
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:406
MemoryContext TopMemoryContext
Definition mcxt.c:167
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
const char * debug_query_string
Definition postgres.c:94
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:617
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1450
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:611
WalSnd * MyWalSnd
Definition walsender.c:132
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:511
static StringInfoData tmpbuf
Definition walsender.c:195
static void IdentifySystem(void)
Definition walsender.c:429
static StringInfoData reply_message
Definition walsender.c:194
void WalSndSetState(WalSndState state)
Definition walsender.c:4155
static StringInfoData output_message
Definition walsender.c:193
static void UploadManifest(void)
Definition walsender.c:702
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:233
bool log_replication_commands
Definition walsender.c:150
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1227
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1492
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:172
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1441
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:844
static XLogReaderState * xlogreader
Definition walsender.c:162
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
void StartTransactionCommand(void)
Definition xact.c:3109
bool IsAbortedTransactionBlockState(void)
Definition xact.c:409
void CommitTransactionCommand(void)
Definition xact.c:3207

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3858 of file walsender.c.

3859{
3861 TimeLineID replayTLI;
3865
3867
3868 /*
3869 * We can safely send what's already been replayed. Also, if walreceiver
3870 * is streaming WAL from the same timeline, we can send anything that it
3871 * has streamed, but hasn't been replayed yet.
3872 */
3873
3875 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3876
3877 if (tli)
3878 *tli = replayTLI;
3879
3880 result = replayPtr;
3881 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3883
3884 return result;
3885}
uint32 result
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1909
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:136
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), receiveTLI, and result.

Referenced by IdentifySystem(), StartReplication(), update_local_synced_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 768 of file walsender.c.

770{
771 int mtype;
772 int maxmsglen;
773
775
777 mtype = pq_getbyte();
778 if (mtype == EOF)
781 errmsg("unexpected EOF on client connection with an open transaction")));
782
783 switch (mtype)
784 {
785 case PqMsg_CopyData:
787 break;
788 case PqMsg_CopyDone:
789 case PqMsg_CopyFail:
790 case PqMsg_Flush:
791 case PqMsg_Sync:
793 break;
794 default:
797 errmsg("unexpected message type 0x%02X during COPY from stdin",
798 mtype)));
799 maxmsglen = 0; /* keep compiler quiet */
800 break;
801 }
802
803 /* Now collect the message body */
807 errmsg("unexpected EOF on client connection with an open transaction")));
809
810 /* Process the message */
811 switch (mtype)
812 {
813 case PqMsg_CopyData:
815 return true;
816
817 case PqMsg_CopyDone:
818 return false;
819
820 case PqMsg_Sync:
821 case PqMsg_Flush:
822 /* Ignore these while in CopyOut mode as we do elsewhere. */
823 return true;
824
825 case PqMsg_CopyFail:
828 errmsg("COPY from stdin failed: %s",
830 }
831
832 /* Not reached. */
833 Assert(false);
834 return false;
835}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:33
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:34
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:146
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1204
int pq_getbyte(void)
Definition pqcomm.c:964
void pq_startmsgread(void)
Definition pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3914 of file walsender.c.

3915{
3917
3918 /*
3919 * If replication has not yet started, die like with SIGTERM. If
3920 * replication is active, only set a flag and wake up the main loop. It
3921 * will send any outstanding WAL, wait for it to be replicated to the
3922 * standby, and then exit gracefully.
3923 */
3924 if (!replication_active)
3926 else
3927 got_STOPPING = true;
3928
3929 /* latch will be set by procsignal_sigusr1_handler */
3930}
int MyProcPid
Definition globals.c:49
bool am_walsender
Definition walsender.c:135
static volatile sig_atomic_t replication_active
Definition walsender.c:241
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 429 of file walsender.c.

430{
431 char sysid[32];
432 char xloc[MAXFNAMELEN];
434 char *dbname = NULL;
437 TupleDesc tupdesc;
438 Datum values[4];
439 bool nulls[4] = {0};
440 TimeLineID currTLI;
441
442 /*
443 * Reply with a result set with one row, four columns. First col is system
444 * ID, second is timeline ID, third is current xlog location and the
445 * fourth contains the database name if we are connected to one.
446 */
447
450
453 logptr = GetStandbyFlushRecPtr(&currTLI);
454 else
455 logptr = GetFlushRecPtr(&currTLI);
456
457 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
458
460 {
462
463 /* syscache access needs a transaction env. */
466 /* copy dbname out of TX context */
469 }
470
472
473 /* need a tuple descriptor representing four columns */
474 tupdesc = CreateTemplateTupleDesc(4);
475 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
476 TEXTOID, -1, 0);
477 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
478 INT8OID, -1, 0);
479 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
480 TEXTOID, -1, 0);
481 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
482 TEXTOID, -1, 0);
483 TupleDescFinalize(tupdesc);
484
485 /* prepare for projection of tuples */
487
488 /* column 1: system identifier */
490
491 /* column 2: timeline */
492 values[1] = Int64GetDatum(currTLI);
493
494 /* column 3: wal location */
496
497 /* column 4: database name, or NULL if none */
498 if (dbname)
500 else
501 nulls[3] = true;
502
503 /* send it to dest */
504 do_tup_output(tstate, values, nulls);
505
507}
#define UINT64_FORMAT
Definition c.h:635
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1323
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1897
static Datum Int64GetDatum(int64 X)
Definition postgres.h:426
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3858
uint64 GetSystemIdentifier(void)
Definition xlog.c:4643
bool RecoveryInProgress(void)
Definition xlog.c:6832
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6997

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 330 of file walsender.c.

331{
333
334 /* Create a per-walsender data structure in shared memory */
336
337 /* need resource owner for e.g. basebackups */
339
340 /*
341 * Let postmaster know that we're a WAL sender. Once we've declared us as
342 * a WAL sender process, postmaster will let us outlive the bgwriter and
343 * kill us last in the shutdown sequence, so we get a chance to stream all
344 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
345 * there's no going back, and we mustn't write any WAL records after this.
346 */
349
350 /*
351 * If the client didn't specify a database to connect to, show in PGPROC
352 * that our advertised xmin should affect vacuum horizons in all
353 * databases. This allows physical replication clients to send hot
354 * standby feedback that will delay vacuum cleanup in all databases.
355 */
357 {
363 }
364
365 /* Initialize empty timestamp buffer for lag tracking. */
367}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1269
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:308
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:44
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:66
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PROC_HDR * ProcGlobal
Definition proc.c:74
TransactionId xmin
Definition proc.h:242
uint8 statusFlags
Definition proc.h:210
int pgxactoff
Definition proc.h:207
uint8 * statusFlags
Definition proc.h:456
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3152
static LagTracker * lag_tracker
Definition walsender.c:279

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3152 of file walsender.c.

3153{
3154 int i;
3155
3156 /*
3157 * WalSndCtl should be set up already (we inherit this by fork() or
3158 * EXEC_BACKEND mechanism from the postmaster).
3159 */
3160 Assert(WalSndCtl != NULL);
3161 Assert(MyWalSnd == NULL);
3162
3163 /*
3164 * Find a free walsender slot and reserve it. This must not fail due to
3165 * the prior check for free WAL senders in InitProcess().
3166 */
3167 for (i = 0; i < max_wal_senders; i++)
3168 {
3170
3171 SpinLockAcquire(&walsnd->mutex);
3172
3173 if (walsnd->pid != 0)
3174 {
3175 SpinLockRelease(&walsnd->mutex);
3176 continue;
3177 }
3178 else
3179 {
3180 /*
3181 * Found a free slot. Reserve it for us.
3182 */
3183 walsnd->pid = MyProcPid;
3184 walsnd->state = WALSNDSTATE_STARTUP;
3185 walsnd->sentPtr = InvalidXLogRecPtr;
3186 walsnd->needreload = false;
3187 walsnd->write = InvalidXLogRecPtr;
3188 walsnd->flush = InvalidXLogRecPtr;
3189 walsnd->apply = InvalidXLogRecPtr;
3190 walsnd->writeLag = -1;
3191 walsnd->flushLag = -1;
3192 walsnd->applyLag = -1;
3193 walsnd->sync_standby_priority = 0;
3194 walsnd->replyTime = 0;
3195
3196 /*
3197 * The kind assignment is done here and not in StartReplication()
3198 * and StartLogicalReplication(). Indeed, the logical walsender
3199 * needs to read WAL records (like snapshot of running
3200 * transactions) during the slot creation. So it needs to be woken
3201 * up based on its kind.
3202 *
3203 * The kind assignment could also be done in StartReplication(),
3204 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3205 * seems better to set it on one place.
3206 */
3207 if (MyDatabaseId == InvalidOid)
3209 else
3211
3212 SpinLockRelease(&walsnd->mutex);
3213 /* don't need the lock anymore */
3214 MyWalSnd = walsnd;
3215
3216 break;
3217 }
3218 }
3219
3220 Assert(MyWalSnd != NULL);
3221
3222 /* Arrange to clean up at walsender exit */
3224}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:141
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3228
WalSndCtlData * WalSndCtl
Definition walsender.c:121
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4499 of file walsender.c.

4500{
4501 TimestampTz time = 0;
4502
4503 /*
4504 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4505 * return the elapsed time (in microseconds) since the saved local flush
4506 * time. If the flush time is in the future (due to clock drift), return
4507 * -1 to treat as no valid sample.
4508 *
4509 * Otherwise, switch back to using the buffer to control the read head and
4510 * compute the elapsed time. The read head is then reset to point to the
4511 * oldest entry in the buffer.
4512 */
4513 if (lag_tracker->read_heads[head] == -1)
4514 {
4515 if (lag_tracker->overflowed[head].lsn > lsn)
4516 return (now >= lag_tracker->overflowed[head].time) ?
4517 now - lag_tracker->overflowed[head].time : -1;
4518
4519 time = lag_tracker->overflowed[head].time;
4521 lag_tracker->read_heads[head] =
4523 }
4524
4525 /* Read all unread samples up to this LSN or end of buffer. */
4526 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4528 {
4530 lag_tracker->last_read[head] =
4532 lag_tracker->read_heads[head] =
4534 }
4535
4536 /*
4537 * If the lag tracker is empty, that means the standby has processed
4538 * everything we've ever sent so we should now clear 'last_read'. If we
4539 * didn't do that, we'd risk using a stale and irrelevant sample for
4540 * interpolation at the beginning of the next burst of WAL after a period
4541 * of idleness.
4542 */
4544 lag_tracker->last_read[head].time = 0;
4545
4546 if (time > now)
4547 {
4548 /* If the clock somehow went backwards, treat as not found. */
4549 return -1;
4550 }
4551 else if (time == 0)
4552 {
4553 /*
4554 * We didn't cross a time. If there is a future sample that we
4555 * haven't reached yet, and we've already reached at least one sample,
4556 * let's interpolate the local flushed time. This is mainly useful
4557 * for reporting a completely stuck apply position as having
4558 * increasing lag, since otherwise we'd have to wait for it to
4559 * eventually start moving again and cross one of our samples before
4560 * we can show the lag increasing.
4561 */
4563 {
4564 /* There are no future samples, so we can't interpolate. */
4565 return -1;
4566 }
4567 else if (lag_tracker->last_read[head].time != 0)
4568 {
4569 /* We can interpolate between last_read and the next sample. */
4570 double fraction;
4571 WalTimeSample prev = lag_tracker->last_read[head];
4573
4574 if (lsn < prev.lsn)
4575 {
4576 /*
4577 * Reported LSNs shouldn't normally go backwards, but it's
4578 * possible when there is a timeline change. Treat as not
4579 * found.
4580 */
4581 return -1;
4582 }
4583
4584 Assert(prev.lsn < next.lsn);
4585
4586 if (prev.time > next.time)
4587 {
4588 /* If the clock somehow went backwards, treat as not found. */
4589 return -1;
4590 }
4591
4592 /* See how far we are between the previous and next samples. */
4593 fraction =
4594 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4595
4596 /* Scale the local flush time proportionally. */
4597 time = (TimestampTz)
4598 ((double) prev.time + (next.time - prev.time) * fraction);
4599 }
4600 else
4601 {
4602 /*
4603 * We have only a future sample, implying that we were entirely
4604 * caught up but and now there is a new burst of WAL and the
4605 * standby hasn't processed the first sample yet. Until the
4606 * standby reaches the future sample the best we can do is report
4607 * the hypothetical lag if that sample were to be replayed now.
4608 */
4610 }
4611 }
4612
4613 /* Return the elapsed time since local flush time in microseconds. */
4614 Assert(time != 0);
4615 return now - time;
4616}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:259
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:261
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:262
int write_head
Definition walsender.c:260
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:276
TimestampTz time
Definition walsender.c:249
XLogRecPtr lsn
Definition walsender.c:248
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:253

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4441 of file walsender.c.

4442{
4443 int new_write_head;
4444 int i;
4445
4446 if (!am_walsender)
4447 return;
4448
4449 /*
4450 * If the lsn hasn't advanced since last time, then do nothing. This way
4451 * we only record a new sample when new WAL has been written.
4452 */
4453 if (lag_tracker->last_lsn == lsn)
4454 return;
4455 lag_tracker->last_lsn = lsn;
4456
4457 /*
4458 * If advancing the write head of the circular buffer would crash into any
4459 * of the read heads, then the buffer is full. In other words, the
4460 * slowest reader (presumably apply) is the one that controls the release
4461 * of space.
4462 */
4464 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4465 {
4466 /*
4467 * If the buffer is full, move the slowest reader to a separate
4468 * overflow entry and free its space in the buffer so the write head
4469 * can advance.
4470 */
4472 {
4475 lag_tracker->read_heads[i] = -1;
4476 }
4477 }
4478
4479 /* Store a sample at the current write head position. */
4483}
XLogRecPtr last_lsn
Definition walsender.c:258
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1077 of file walsender.c.

1079{
1081 int count;
1083 XLogSegNo segno;
1084 TimeLineID currTLI;
1085
1086 /*
1087 * Make sure we have enough WAL available before retrieving the current
1088 * timeline.
1089 */
1091
1092 /* Fail if not enough (implies we are going to shut down) */
1094 return -1;
1095
1096 /*
1097 * Since logical decoding is also permitted on a standby server, we need
1098 * to check if the server is in recovery to decide how to get the current
1099 * timeline ID (so that it also covers the promotion or timeline change
1100 * cases). We must determine am_cascading_walsender after waiting for the
1101 * required WAL so that it is correct when the walsender wakes up after a
1102 * promotion.
1103 */
1105
1107 GetXLogReplayRecPtr(&currTLI);
1108 else
1109 currTLI = GetWALInsertionTimeLine();
1110
1112 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1113 sendTimeLine = state->currTLI;
1114 sendTimeLineValidUpto = state->currTLIValidUntil;
1115 sendTimeLineNextTLI = state->nextTLI;
1116
1118 count = XLOG_BLCKSZ; /* more than one block available */
1119 else
1120 count = flushptr - targetPagePtr; /* part of the page available */
1121
1122 /* now actually read the data, we know it's there */
1123 if (!WALRead(state,
1124 cur_page,
1126 count,
1127 currTLI, /* Pass the current TLI because only
1128 * WalSndSegmentOpen controls whether new TLI
1129 * is needed. */
1130 &errinfo))
1132
1133 /*
1134 * After reading into the buffer, check that what we read was valid. We do
1135 * this after reading, because even though the segment was present when we
1136 * opened it, it might get recycled or removed while we read it. The
1137 * read() succeeds in that case, but the data we tried to read might
1138 * already have been overwritten with new WAL records.
1139 */
1140 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1141 CheckXLogRemoved(segno, state->seg.ws_tli);
1142
1143 return count;
1144}
static TimeLineID sendTimeLine
Definition walsender.c:181
static bool sendTimeLineIsHistoric
Definition walsender.c:183
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1886
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:182
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:184
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:7018
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3778
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1826 of file walsender.c.

1827{
1828 int elevel = got_STOPPING ? ERROR : WARNING;
1829 bool failover_slot;
1830
1832
1833 /*
1834 * Note that after receiving the shutdown signal, an ERROR is reported if
1835 * any slots are dropped, invalidated, or inactive. This measure is taken
1836 * to prevent the walsender from waiting indefinitely.
1837 */
1839 {
1841 return true;
1842 }
1843
1844 *wait_event = 0;
1845 return false;
1846}
#define WARNING
Definition elog.h:37
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3113

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1858 of file walsender.c.

1860{
1861 /* Check if we need to wait for WALs to be flushed to disk */
1862 if (target_lsn > flushed_lsn)
1863 {
1865 return true;
1866 }
1867
1868 /* Check if the standby slots have caught up to the flushed position */
1870}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1826

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 4193 of file walsender.c.

4194{
4196
4197 result->month = 0;
4198 result->day = 0;
4199 result->time = offset;
4200
4201 return result;
4202}
#define palloc_object(type)
Definition fe_memutils.h:89

References palloc_object, and result.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1150 of file walsender.c.

1154{
1155 ListCell *lc;
1156 bool snapshot_action_given = false;
1157 bool reserve_wal_given = false;
1158 bool two_phase_given = false;
1159 bool failover_given = false;
1160
1161 /* Parse options */
1162 foreach(lc, cmd->options)
1163 {
1164 DefElem *defel = (DefElem *) lfirst(lc);
1165
1166 if (strcmp(defel->defname, "snapshot") == 0)
1167 {
1168 char *action;
1169
1171 ereport(ERROR,
1173 errmsg("conflicting or redundant options")));
1174
1176 snapshot_action_given = true;
1177
1178 if (strcmp(action, "export") == 0)
1180 else if (strcmp(action, "nothing") == 0)
1182 else if (strcmp(action, "use") == 0)
1184 else
1185 ereport(ERROR,
1187 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1188 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1189 }
1190 else if (strcmp(defel->defname, "reserve_wal") == 0)
1191 {
1193 ereport(ERROR,
1195 errmsg("conflicting or redundant options")));
1196
1197 reserve_wal_given = true;
1199 }
1200 else if (strcmp(defel->defname, "two_phase") == 0)
1201 {
1203 ereport(ERROR,
1205 errmsg("conflicting or redundant options")));
1206 two_phase_given = true;
1208 }
1209 else if (strcmp(defel->defname, "failover") == 0)
1210 {
1212 ereport(ERROR,
1214 errmsg("conflicting or redundant options")));
1215 failover_given = true;
1217 }
1218 else
1219 elog(ERROR, "unrecognized option: %s", defel->defname);
1220 }
1221}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 4209 of file walsender.c.

4210{
4211#define PG_STAT_GET_WAL_SENDERS_COLS 12
4212 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4214 int num_standbys;
4215 int i;
4216
4217 InitMaterializedSRF(fcinfo, 0);
4218
4219 /*
4220 * Get the currently active synchronous standbys. This could be out of
4221 * date before we're done, but we'll use the data anyway.
4222 */
4224
4225 for (i = 0; i < max_wal_senders; i++)
4226 {
4230 XLogRecPtr flush;
4231 XLogRecPtr apply;
4232 TimeOffset writeLag;
4233 TimeOffset flushLag;
4234 TimeOffset applyLag;
4235 int priority;
4236 int pid;
4238 TimestampTz replyTime;
4239 bool is_sync_standby;
4241 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4242 int j;
4243
4244 /* Collect data from shared memory */
4245 SpinLockAcquire(&walsnd->mutex);
4246 if (walsnd->pid == 0)
4247 {
4248 SpinLockRelease(&walsnd->mutex);
4249 continue;
4250 }
4251 pid = walsnd->pid;
4252 sent_ptr = walsnd->sentPtr;
4253 state = walsnd->state;
4254 write = walsnd->write;
4255 flush = walsnd->flush;
4256 apply = walsnd->apply;
4257 writeLag = walsnd->writeLag;
4258 flushLag = walsnd->flushLag;
4259 applyLag = walsnd->applyLag;
4260 priority = walsnd->sync_standby_priority;
4261 replyTime = walsnd->replyTime;
4262 SpinLockRelease(&walsnd->mutex);
4263
4264 /*
4265 * Detect whether walsender is/was considered synchronous. We can
4266 * provide some protection against stale data by checking the PID
4267 * along with walsnd_index.
4268 */
4269 is_sync_standby = false;
4270 for (j = 0; j < num_standbys; j++)
4271 {
4272 if (sync_standbys[j].walsnd_index == i &&
4273 sync_standbys[j].pid == pid)
4274 {
4275 is_sync_standby = true;
4276 break;
4277 }
4278 }
4279
4280 values[0] = Int32GetDatum(pid);
4281
4283 {
4284 /*
4285 * Only superusers and roles with privileges of pg_read_all_stats
4286 * can see details. Other users only get the pid value to know
4287 * it's a walsender, but no details.
4288 */
4289 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4290 }
4291 else
4292 {
4294
4296 nulls[2] = true;
4298
4300 nulls[3] = true;
4301 values[3] = LSNGetDatum(write);
4302
4303 if (!XLogRecPtrIsValid(flush))
4304 nulls[4] = true;
4305 values[4] = LSNGetDatum(flush);
4306
4307 if (!XLogRecPtrIsValid(apply))
4308 nulls[5] = true;
4309 values[5] = LSNGetDatum(apply);
4310
4311 /*
4312 * Treat a standby such as a pg_basebackup background process
4313 * which always returns an invalid flush location, as an
4314 * asynchronous standby.
4315 */
4316 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4317
4318 if (writeLag < 0)
4319 nulls[6] = true;
4320 else
4322
4323 if (flushLag < 0)
4324 nulls[7] = true;
4325 else
4327
4328 if (applyLag < 0)
4329 nulls[8] = true;
4330 else
4332
4334
4335 /*
4336 * More easily understood version of standby state. This is purely
4337 * informational.
4338 *
4339 * In quorum-based sync replication, the role of each standby
4340 * listed in synchronous_standby_names can be changing very
4341 * frequently. Any standbys considered as "sync" at one moment can
4342 * be switched to "potential" ones at the next moment. So, it's
4343 * basically useless to report "sync" or "potential" as their sync
4344 * states. We report just "quorum" for them.
4345 */
4346 if (priority == 0)
4347 values[10] = CStringGetTextDatum("async");
4348 else if (is_sync_standby)
4350 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4351 else
4352 values[10] = CStringGetTextDatum("potential");
4353
4354 if (replyTime == 0)
4355 nulls[11] = true;
4356 else
4357 values[11] = TimestampTzGetDatum(replyTime);
4358 }
4359
4360 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4361 values, nulls);
4362 }
4363
4364 return (Datum) 0;
4365}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define MemSet(start, val, len)
Definition c.h:1107
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:470
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:764
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:4193
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:4174
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2472 of file walsender.c.

2473{
2474 bool changed = false;
2476
2478 SpinLockAcquire(&slot->mutex);
2479 if (slot->data.restart_lsn != lsn)
2480 {
2481 changed = true;
2482 slot->data.restart_lsn = lsn;
2483 }
2484 SpinLockRelease(&slot->mutex);
2485
2486 if (changed)
2487 {
2491 }
2492
2493 /*
2494 * One could argue that the slot should be saved to disk now, but that'd
2495 * be energy wasted - the worst thing lost information could cause here is
2496 * to give wrong information in a statistics view - we'll just potentially
2497 * be more conservative in removing files.
2498 */
2499}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1308
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1801

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2613 of file walsender.c.

2614{
2615 bool changed = false;
2617
2618 SpinLockAcquire(&slot->mutex);
2620
2621 /*
2622 * For physical replication we don't need the interlock provided by xmin
2623 * and effective_xmin since the consequences of a missed increase are
2624 * limited to query cancellations, so set both at once.
2625 */
2626 if (!TransactionIdIsNormal(slot->data.xmin) ||
2629 {
2630 changed = true;
2631 slot->data.xmin = feedbackXmin;
2633 }
2637 {
2638 changed = true;
2641 }
2642 SpinLockRelease(&slot->mutex);
2643
2644 if (changed)
2645 {
2648 }
2649}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1226
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:210
TransactionId effective_xmin
Definition slot.h:209
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1801 of file walsender.c.

1802{
1804
1805 /*
1806 * If we are running in a standby, there is no need to wake up walsenders.
1807 * This is because we do not support syncing slots to cascading standbys,
1808 * so, there are no walsenders waiting for standbys to catch up.
1809 */
1810 if (RecoveryInProgress())
1811 return;
1812
1815}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3080
#define SlotIsPhysical(slot)
Definition slot.h:287
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1680 of file walsender.c.

1681{
1682 for (;;)
1683 {
1684 long sleeptime;
1685
1686 /* Check for input from the client */
1688
1689 /* die if timeout was reached */
1691
1692 /*
1693 * During shutdown, die if the shutdown timeout expires. Call this
1694 * before WalSndComputeSleeptime() so the timeout is considered when
1695 * computing sleep time.
1696 */
1698
1699 /* Send keepalive if the time has come */
1701
1702 if (!pq_is_send_pending())
1703 break;
1704
1706
1707 /* Sleep until something happens or we time out */
1710
1711 /* Clear any already-pending wakeups */
1713
1715
1716 /* Process any requests or signals received recently */
1718
1719 /* Try to flush pending output to the client */
1720 if (pq_flush_if_writable() != 0)
1722 }
1723
1724 /* reactivate latch so WalSndLoop knows to continue */
1726}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
struct Latch * MyLatch
Definition globals.c:65
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:50
#define pq_is_send_pending()
Definition libpq.h:51
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:4033
static void WalSndCheckTimeOut(void)
Definition walsender.c:2942
static void ProcessRepliesIfAny(void)
Definition walsender.c:2321
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4403
static void WalSndCheckShutdownTimeout(void)
Definition walsender.c:2972
static void WalSndHandleConfigReload(void)
Definition walsender.c:1657
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:413
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2885

References CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), SetLatch(), WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2321 of file walsender.c.

2322{
2323 unsigned char firstchar;
2324 int maxmsglen;
2325 int r;
2326 bool received = false;
2327
2329
2330 /*
2331 * If we already received a CopyDone from the frontend, any subsequent
2332 * message is the beginning of a new command, and should be processed in
2333 * the main processing loop.
2334 */
2335 while (!streamingDoneReceiving)
2336 {
2339 if (r < 0)
2340 {
2341 /* unexpected error or EOF */
2344 errmsg("unexpected EOF on standby connection")));
2345 proc_exit(0);
2346 }
2347 if (r == 0)
2348 {
2349 /* no data available without blocking */
2350 pq_endmsgread();
2351 break;
2352 }
2353
2354 /* Validate message type and set packet size limit */
2355 switch (firstchar)
2356 {
2357 case PqMsg_CopyData:
2359 break;
2360 case PqMsg_CopyDone:
2361 case PqMsg_Terminate:
2363 break;
2364 default:
2365 ereport(FATAL,
2367 errmsg("invalid standby message type \"%c\"",
2368 firstchar)));
2369 maxmsglen = 0; /* keep compiler quiet */
2370 break;
2371 }
2372
2373 /* Read the message contents */
2376 {
2379 errmsg("unexpected EOF on standby connection")));
2380 proc_exit(0);
2381 }
2382
2383 /* ... and process it */
2384 switch (firstchar)
2385 {
2386 /*
2387 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2388 * packet.
2389 */
2390 case PqMsg_CopyData:
2392 received = true;
2393 break;
2394
2395 /*
2396 * PqMsg_CopyDone means the standby requested to finish
2397 * streaming. Reply with CopyDone, if we had not sent that
2398 * already.
2399 */
2400 case PqMsg_CopyDone:
2402 {
2404 streamingDoneSending = true;
2405 }
2406
2408 received = true;
2409 break;
2410
2411 /*
2412 * PqMsg_Terminate means that the standby is closing down the
2413 * socket.
2414 */
2415 case PqMsg_Terminate:
2416 proc_exit(0);
2417
2418 default:
2419 Assert(false); /* NOT REACHED */
2420 }
2421 }
2422
2423 /*
2424 * Save the last reply timestamp if we've received at least one reply.
2425 */
2426 if (received)
2427 {
2430 }
2431}
#define COMMERROR
Definition elog.h:34
#define FATAL
Definition elog.h:42
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:54
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1004
void pq_endmsgread(void)
Definition pqcomm.c:1166
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:207
static TimestampTz last_processing
Definition walsender.c:198
static bool streamingDoneSending
Definition walsender.c:225
static void ProcessStandbyMessage(void)
Definition walsender.c:2437
static bool streamingDoneReceiving
Definition walsender.c:226

References Assert, COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2693 of file walsender.c.

2694{
2699 TimestampTz replyTime;
2700
2701 /*
2702 * Decipher the reply message. The caller already consumed the msgtype
2703 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2704 * of this message.
2705 */
2706 replyTime = pq_getmsgint64(&reply_message);
2711
2713 {
2714 char *replyTimeStr;
2715
2716 /* Copy because timestamptz_to_str returns a static buffer */
2718
2719 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2724 replyTimeStr);
2725
2727 }
2728
2729 /*
2730 * Update shared state for this WalSender process based on reply data from
2731 * standby.
2732 */
2733 {
2735
2736 SpinLockAcquire(&walsnd->mutex);
2737 walsnd->replyTime = replyTime;
2738 SpinLockRelease(&walsnd->mutex);
2739 }
2740
2741 /*
2742 * Unset WalSender's xmins if the feedback message values are invalid.
2743 * This happens when the downstream turned hot_standby_feedback off.
2744 */
2747 {
2749 if (MyReplicationSlot != NULL)
2751 return;
2752 }
2753
2754 /*
2755 * Check that the provided xmin/epoch are sane, that is, not in the future
2756 * and not so far back as to be already wrapped around. Ignore if not.
2757 */
2760 return;
2761
2764 return;
2765
2766 /*
2767 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2768 * the xmin will be taken into account by GetSnapshotData() /
2769 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2770 * thereby prevent the generation of cleanup conflicts on the standby
2771 * server.
2772 *
2773 * There is a small window for a race condition here: although we just
2774 * checked that feedbackXmin precedes nextXid, the nextXid could have
2775 * gotten advanced between our fetching it and applying the xmin below,
2776 * perhaps far enough to make feedbackXmin wrap around. In that case the
2777 * xmin we set here would be "in the future" and have no effect. No point
2778 * in worrying about this since it's too late to save the desired data
2779 * anyway. Assuming that the standby sends us an increasing sequence of
2780 * xmins, this could only happen during the first reply cycle, else our
2781 * own xmin would prevent nextXid from advancing so far.
2782 *
2783 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2784 * is assumed atomic, and there's no real need to prevent concurrent
2785 * horizon determinations. (If we're moving our xmin forward, this is
2786 * obviously safe, and if we're moving it backwards, well, the data is at
2787 * risk already since a VACUUM could already have determined the horizon.)
2788 *
2789 * If we're using a replication slot we reserve the xmin via that,
2790 * otherwise via the walsender's PGPROC entry. We can only track the
2791 * catalog xmin separately when using a slot, so we store the least of the
2792 * two provided when not using a slot.
2793 *
2794 * XXX: It might make sense to generalize the ephemeral slot concept and
2795 * always use the slot mechanism to handle the feedback xmin.
2796 */
2797 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2799 else
2800 {
2804 else
2806 }
2807}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1870
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
bool message_level_is_interesting(int elevel)
Definition elog.c:285
#define DEBUG2
Definition elog.h:30
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2613
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2662

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2437 of file walsender.c.

2438{
2439 char msgtype;
2440
2441 /*
2442 * Check message type from the first byte.
2443 */
2445
2446 switch (msgtype)
2447 {
2450 break;
2451
2454 break;
2455
2458 break;
2459
2460 default:
2463 errmsg("unexpected message type \"%c\"", msgtype)));
2464 proc_exit(0);
2465 }
2466}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2693
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2813
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2505

References COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2813 of file walsender.c.

2814{
2821 TimestampTz replyTime;
2822
2823 /*
2824 * This shouldn't happen because we don't support getting primary status
2825 * message from standby.
2826 */
2827 if (RecoveryInProgress())
2828 elog(ERROR, "the primary status is unavailable during recovery");
2829
2830 replyTime = pq_getmsgint64(&reply_message);
2831
2832 /*
2833 * Update shared state for this WalSender process based on reply data from
2834 * standby.
2835 */
2836 SpinLockAcquire(&walsnd->mutex);
2837 walsnd->replyTime = replyTime;
2838 SpinLockRelease(&walsnd->mutex);
2839
2840 /*
2841 * Consider transactions in the current database, as only these are the
2842 * ones replicated.
2843 */
2846
2847 /*
2848 * Update the oldest xid for standby transmission if an older prepared
2849 * transaction exists and is currently in commit phase.
2850 */
2854
2858 lsn = GetXLogWriteRecPtr();
2859
2860 elog(DEBUG2, "sending primary status");
2861
2862 /* construct the message... */
2869
2870 /* ... and send it wrapped in CopyData */
2872}
int64_t int64
Definition c.h:621
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2845
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:441
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2835
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:283
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:10124

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2505 of file walsender.c.

2506{
2508 flushPtr,
2509 applyPtr;
2510 bool replyRequested;
2511 TimeOffset writeLag,
2512 flushLag,
2513 applyLag;
2514 bool clearLagTimes;
2516 TimestampTz replyTime;
2517
2521
2522 /* the caller already consumed the msgtype byte */
2526 replyTime = pq_getmsgint64(&reply_message);
2528
2530 {
2531 char *replyTimeStr;
2532
2533 /* Copy because timestamptz_to_str returns a static buffer */
2535
2536 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2540 replyRequested ? " (reply requested)" : "",
2541 replyTimeStr);
2542
2544 }
2545
2546 /* See if we can compute the round-trip lag for these positions. */
2551
2552 /*
2553 * If the standby reports that it has fully replayed the WAL, and the
2554 * write/flush/apply positions remain unchanged across two consecutive
2555 * reply messages, forget the lag times measured when it last
2556 * wrote/flushed/applied a WAL record.
2557 *
2558 * The second message with unchanged positions typically results from
2559 * wal_receiver_status_interval expiring on the standby, so lag values are
2560 * usually cleared after that interval when there is no activity. This
2561 * avoids displaying stale lag data until more WAL traffic arrives.
2562 */
2566
2570
2571 /* Send a reply if the standby requested one. */
2572 if (replyRequested)
2574
2575 /*
2576 * Update shared state for this WalSender process based on reply data from
2577 * standby.
2578 */
2579 {
2581
2582 SpinLockAcquire(&walsnd->mutex);
2583 walsnd->write = writePtr;
2584 walsnd->flush = flushPtr;
2585 walsnd->apply = applyPtr;
2586 if (writeLag != -1 || clearLagTimes)
2587 walsnd->writeLag = writeLag;
2588 if (flushLag != -1 || clearLagTimes)
2589 walsnd->flushLag = flushLag;
2590 if (applyLag != -1 || clearLagTimes)
2591 walsnd->applyLag = applyLag;
2592 walsnd->replyTime = replyTime;
2593 SpinLockRelease(&walsnd->mutex);
2594 }
2595
2598
2599 /*
2600 * Advance our local xmin horizon when the client confirmed a flush.
2601 */
2603 {
2606 else
2608 }
2609}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1816
#define SlotIsLogical(slot)
Definition slot.h:288
void SyncRepReleaseWaiters(void)
Definition syncrep.c:484
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:190
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2472
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4380
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4499

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 511 of file walsender.c.

512{
513#define READ_REPLICATION_SLOT_COLS 3
514 ReplicationSlot *slot;
517 TupleDesc tupdesc;
519 bool nulls[READ_REPLICATION_SLOT_COLS];
520
522 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
523 TEXTOID, -1, 0);
524 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
525 TEXTOID, -1, 0);
526 /* TimeLineID is unsigned, so int4 is not wide enough. */
527 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
528 INT8OID, -1, 0);
529 TupleDescFinalize(tupdesc);
530
531 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
532
534 slot = SearchNamedReplicationSlot(cmd->slotname, false);
535 if (slot == NULL || !slot->in_use)
536 {
538 }
539 else
540 {
542 int i = 0;
543
544 /* Copy slot contents while holding spinlock */
545 SpinLockAcquire(&slot->mutex);
546 slot_contents = *slot;
547 SpinLockRelease(&slot->mutex);
549
550 if (OidIsValid(slot_contents.data.database))
553 errmsg("cannot use %s with a logical replication slot",
554 "READ_REPLICATION_SLOT"));
555
556 /* slot type */
557 values[i] = CStringGetTextDatum("physical");
558 nulls[i] = false;
559 i++;
560
561 /* start LSN */
562 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
563 {
564 char xloc[64];
565
566 snprintf(xloc, sizeof(xloc), "%X/%08X",
567 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
569 nulls[i] = false;
570 }
571 i++;
572
573 /* timeline this WAL was produced on */
574 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
575 {
579
580 /*
581 * While in recovery, use as timeline the currently-replaying one
582 * to get the LSN position's history.
583 */
584 if (RecoveryInProgress())
586 else
588
593 nulls[i] = false;
594 }
595 i++;
596
598 }
599
602 do_tup_output(tstate, values, nulls);
604}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:77
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:545
#define OidIsValid(objectId)
Definition c.h:858
@ LW_SHARED
Definition lwlock.h:105
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:548
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg, ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), tliOfPointInHistory(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 611 of file walsender.c.

612{
614 TupleDesc tupdesc;
617 char path[MAXPGPATH];
618 int fd;
621 Size len;
622
624
625 /*
626 * Reply with a result set with one row, and two columns. The first col is
627 * the name of the history file, 2nd is the contents.
628 */
629 tupdesc = CreateTemplateTupleDesc(2);
630 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
631 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
632 TupleDescFinalize(tupdesc);
633
635 TLHistoryFilePath(path, cmd->timeline);
636
637 /* Send a RowDescription message */
638 dest->rStartup(dest, CMD_SELECT, tupdesc);
639
640 /* Send a DataRow message */
642 pq_sendint16(&buf, 2); /* # of columns */
644 pq_sendint32(&buf, len); /* col1 len */
646
648 if (fd < 0)
651 errmsg("could not open file \"%s\": %m", path)));
652
653 /* Determine file length and send it to client */
655 if (histfilelen < 0)
658 errmsg("could not seek to end of file \"%s\": %m", path)));
659 if (lseek(fd, 0, SEEK_SET) != 0)
662 errmsg("could not seek to beginning of file \"%s\": %m", path)));
663
664 pq_sendint32(&buf, histfilelen); /* col2 len */
665
667 while (bytesleft > 0)
668 {
670 int nread;
671
673 nread = read(fd, rbuf.data, sizeof(rbuf));
675 if (nread < 0)
678 errmsg("could not read file \"%s\": %m",
679 path)));
680 else if (nread == 0)
683 errmsg("could not read file \"%s\": read %d of %zu",
684 path, nread, (Size) bytesleft)));
685
686 pq_sendbytes(&buf, rbuf.data, nread);
687 bytesleft -= nread;
688 }
689
690 if (CloseTransientFile(fd) != 0)
693 errmsg("could not close file \"%s\": %m", path)));
694
696}
#define PG_BINARY
Definition c.h:1374
size_t Size
Definition c.h:689
int errcode_for_file_access(void)
Definition elog.c:898
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:275
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:67
static void pgstat_report_wait_end(void)
Definition wait_event.h:83
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), TupleDescFinalize(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1492 of file walsender.c.

1493{
1495 QueryCompletion qc;
1496
1497 /* make sure that our requirements are still fulfilled */
1499
1501
1502 ReplicationSlotAcquire(cmd->slotname, true, true);
1503
1504 /*
1505 * Force a disconnect, so that the decoding code doesn't need to care
1506 * about an eventual switch from running in recovery, to running in a
1507 * normal environment. Client code is expected to handle reconnects.
1508 */
1510 {
1511 ereport(LOG,
1512 (errmsg("terminating walsender process after promotion")));
1513 got_STOPPING = true;
1514 }
1515
1516 /*
1517 * Create our decoding context, making it start at the previously ack'ed
1518 * position.
1519 *
1520 * Do this before sending a CopyBothResponse message, so that any errors
1521 * are reported early.
1522 */
1524 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1526 .segment_open = WalSndSegmentOpen,
1527 .segment_close = wal_segment_close),
1531
1533
1534 /* Send a CopyBothResponse message, and start streaming */
1536 pq_sendbyte(&buf, 0);
1537 pq_sendint16(&buf, 0);
1539 pq_flush();
1540
1541 /* Start reading WAL from the oldest required WAL. */
1544
1545 /*
1546 * Report the location after which we'll send out further commits as the
1547 * current sentPtr.
1548 */
1550
1551 /* Also update the sent position status in shared memory */
1555
1556 replication_active = true;
1557
1559
1560 /* Main loop of walsender */
1562
1565
1566 replication_active = false;
1567 if (got_STOPPING)
1568 proc_exit(0);
1570
1571 /* Get out of COPY mode (CommandComplete). */
1573 EndCommand(&qc, DestRemote, false);
1574}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:205
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:49
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:494
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
void SyncRepInitConfig(void)
Definition syncrep.c:455
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:3008
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:243
static void XLogSendLogical(void)
Definition walsender.c:3632
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg, fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 844 of file walsender.c.

845{
849
850 /* create xlogreader for physical replication */
851 xlogreader =
853 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
854 .segment_close = wal_segment_close),
855 NULL);
856
857 if (!xlogreader)
860 errmsg("out of memory"),
861 errdetail("Failed while allocating a WAL reading processor.")));
862
863 /*
864 * We assume here that we're logging enough information in the WAL for
865 * log-shipping, since this is checked in PostmasterMain().
866 *
867 * NOTE: wal_level can only change at shutdown, so in most cases it is
868 * difficult for there to be WAL data that we can still see that was
869 * written at wal_level='minimal'.
870 */
871
872 if (cmd->slotname)
873 {
874 ReplicationSlotAcquire(cmd->slotname, true, true);
878 errmsg("cannot use a logical replication slot for physical replication")));
879
880 /*
881 * We don't need to verify the slot's restart_lsn here; instead we
882 * rely on the caller requesting the starting point to use. If the
883 * WAL segment doesn't exist, we'll fail later.
884 */
885 }
886
887 /*
888 * Select the timeline. If it was given explicitly by the client, use
889 * that. Otherwise use the timeline of the last replayed record.
890 */
894 else
896
897 if (cmd->timeline != 0)
898 {
900
901 sendTimeLine = cmd->timeline;
902 if (sendTimeLine == FlushTLI)
903 {
906 }
907 else
908 {
910
912
913 /*
914 * Check that the timeline the client requested exists, and the
915 * requested start location is on that timeline.
916 */
921
922 /*
923 * Found the requested timeline in the history. Check that
924 * requested startpoint is on that timeline in our history.
925 *
926 * This is quite loose on purpose. We only check that we didn't
927 * fork off the requested timeline before the switchpoint. We
928 * don't check that we switched *to* it before the requested
929 * starting point. This is because the client can legitimately
930 * request to start replication from the beginning of the WAL
931 * segment that contains switchpoint, but on the new timeline, so
932 * that it doesn't end up with a partial segment. If you ask for
933 * too old a starting point, you'll get an error later when we
934 * fail to find the requested WAL segment in pg_wal.
935 *
936 * XXX: we could be more strict here and only allow a startpoint
937 * that's older than the switchpoint, if it's still in the same
938 * WAL segment.
939 */
941 switchpoint < cmd->startpoint)
942 {
944 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
946 cmd->timeline),
947 errdetail("This server's history forked from timeline %u at %X/%08X.",
948 cmd->timeline,
950 }
952 }
953 }
954 else
955 {
959 }
960
962
963 /* If there is nothing to stream, don't even enter COPY mode */
965 {
966 /*
967 * When we first start replication the standby will be behind the
968 * primary. For some applications, for example synchronous
969 * replication, it is important to have a clear state for this initial
970 * catchup mode, so we can trigger actions when we change streaming
971 * state later. We may stay in this state for a long time, which is
972 * exactly why we want to be able to monitor whether or not we are
973 * still here.
974 */
976
977 /* Send a CopyBothResponse message, and start streaming */
979 pq_sendbyte(&buf, 0);
980 pq_sendint16(&buf, 0);
982 pq_flush();
983
984 /*
985 * Don't allow a request to stream from a future point in WAL that
986 * hasn't been flushed to disk in this server yet.
987 */
988 if (FlushPtr < cmd->startpoint)
989 {
991 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
994 }
995
996 /* Start streaming from the requested point */
997 sentPtr = cmd->startpoint;
998
999 /* Initialize shared memory status, too */
1003
1005
1006 /* Main loop of walsender */
1007 replication_active = true;
1008
1010
1011 replication_active = false;
1012 if (got_STOPPING)
1013 proc_exit(0);
1015
1017 }
1018
1019 if (cmd->slotname)
1021
1022 /*
1023 * Copy is finished now. Send a single-row result set indicating the next
1024 * timeline.
1025 */
1027 {
1028 char startpos_str[8 + 1 + 8 + 1];
1031 TupleDesc tupdesc;
1032 Datum values[2];
1033 bool nulls[2] = {0};
1034
1035 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1037
1039
1040 /*
1041 * Need a tuple descriptor representing two columns. int8 may seem
1042 * like a surprising data type for this, but in theory int4 would not
1043 * be wide enough for this, as TimeLineID is unsigned.
1044 */
1045 tupdesc = CreateTemplateTupleDesc(2);
1046 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1047 INT8OID, -1, 0);
1048 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1049 TEXTOID, -1, 0);
1050 TupleDescFinalize(tupdesc);
1051
1052 /* prepare for projection of tuple */
1054
1057
1058 /* send it to dest */
1059 do_tup_output(tstate, values, nulls);
1060
1062 }
1063
1064 /* Send CommandComplete message */
1065 EndReplicationCommand("START_STREAMING");
1066}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:573
int errdetail(const char *fmt,...) pg_attribute_printf(1
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3322
int wal_segment_size
Definition xlog.c:150
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:108

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2662 of file walsender.c.

2663{
2665 TransactionId nextXid;
2667
2671
2672 if (xid <= nextXid)
2673 {
2674 if (epoch != nextEpoch)
2675 return false;
2676 }
2677 else
2678 {
2679 if (epoch + 1 != nextEpoch)
2680 return false;
2681 }
2682
2683 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2684 return false; /* epoch OK, but it's wrapped around */
2685
2686 return true;
2687}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 702 of file walsender.c.

703{
704 MemoryContext mcxt;
706 off_t offset = 0;
708
709 /*
710 * parsing the manifest will use the cryptohash stuff, which requires a
711 * resource owner
712 */
717
718 /* Prepare to read manifest data into a temporary context. */
720 "incremental backup information",
723
724 /* Send a CopyInResponse message */
726 pq_sendbyte(&buf, 0);
727 pq_sendint16(&buf, 0);
729 pq_flush();
730
731 /* Receive packets from client until done. */
732 while (HandleUploadManifestPacket(&buf, &offset, ib))
733 ;
734
735 /* Finish up manifest processing. */
737
738 /*
739 * Discard any old manifest information and arrange to preserve the new
740 * information we just got.
741 *
742 * We assume that MemoryContextDelete and MemoryContextSetParent won't
743 * fail, and thus we shouldn't end up bailing out of here in such a way as
744 * to leave dangling pointers.
745 */
751
752 /* clean up the resource owner we created */
754}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:689
MemoryContext CacheMemoryContext
Definition mcxt.c:170
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:768
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:173

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckShutdownTimeout()

static void WalSndCheckShutdownTimeout ( void  )
static

Definition at line 2972 of file walsender.c.

2973{
2975
2976 /* Do nothing if shutdown has not been requested yet */
2977 if (!(got_STOPPING || got_SIGUSR2))
2978 return;
2979
2980 /* Terminate immediately if the timeout is set to 0 */
2983
2984 /*
2985 * Record the shutdown request timestamp even if
2986 * wal_sender_shutdown_timeout is disabled (-1), since the setting may
2987 * change during shutdown and the timestamp will be needed in that case.
2988 */
2990 {
2992 return;
2993 }
2994
2995 /* Do not check the timeout if it's disabled */
2997 return;
2998
2999 /* Terminate immediately if the timeout expires */
3004}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1789
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:232
int wal_sender_shutdown_timeout
Definition walsender.c:146
static TimestampTz shutdown_request_timestamp
Definition walsender.c:210
static pg_noreturn void WalSndDoneImmediate(void)
Definition walsender.c:3719

References GetCurrentTimestamp(), got_SIGUSR2, got_STOPPING, now(), shutdown_request_timestamp, TimestampDifferenceExceeds(), wal_sender_shutdown_timeout, and WalSndDoneImmediate().

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2942 of file walsender.c.

2943{
2945
2946 /* don't bail out if we're doing something that doesn't require timeouts */
2947 if (last_reply_timestamp <= 0)
2948 return;
2949
2952
2954 {
2955 /*
2956 * Since typically expiration of replication timeout means
2957 * communication problem, we don't send the error message to the
2958 * standby.
2959 */
2961 (errmsg("terminating walsender process due to replication timeout")));
2962
2964 }
2965}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:143

References COMMERROR, ereport, errmsg, fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2885 of file walsender.c.

2886{
2888 long sleeptime = 10000; /* 10 s */
2889
2891 {
2892 /*
2893 * At the latest stop sleeping once wal_sender_timeout has been
2894 * reached.
2895 */
2898
2899 /*
2900 * If no ping has been sent yet, wakeup when it's time to do so.
2901 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2902 * the timeout passed without a response.
2903 */
2906 wal_sender_timeout / 2);
2907
2908 /* Compute relative time until wakeup. */
2910 }
2911
2913 {
2914 long shutdown_sleeptime;
2915
2918
2920
2921 /* Choose the earliest wakeup. */
2924 }
2925
2926 return sleeptime;
2927}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765

References fb(), last_reply_timestamp, now(), shutdown_request_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_shutdown_timeout, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3770 of file walsender.c.

3771{
3773
3774 /* ... let's just be real sure we're caught up ... */
3775 send_data();
3776
3777 /*
3778 * To figure out whether all WAL has successfully been replicated, check
3779 * flush location if valid, write otherwise. Tools like pg_receivewal will
3780 * usually (unless in synchronous mode) return an invalid flush location.
3781 */
3784
3787 {
3788 QueryCompletion qc;
3789
3791
3792 /* Inform the standby that XLOG streaming is done */
3794 EndCommandExtended(&qc, DestRemote, false, true);
3796
3797 /*
3798 * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime()
3799 * calls ignore wal_sender_timeout during shutdown.
3800 */
3802
3803 /*
3804 * Do not call pq_flush() here, since it can block indefinitely while
3805 * waiting for the socket to become writable, preventing
3806 * wal_sender_shutdown_timeout from being enforced. Instead, use the
3807 * walsender nonblocking flush path so the shutdown timeout continues
3808 * to be checked while the send buffer drains.
3809 */
3810 for (;;)
3811 {
3812 long sleeptime;
3813
3814 /*
3815 * During shutdown, die if the shutdown timeout expires. Call this
3816 * before WalSndComputeSleeptime() so the timeout is considered
3817 * when computing sleep time.
3818 */
3820
3821 if (!pq_is_send_pending())
3822 break;
3823
3825
3826 /* Sleep until something happens or we time out */
3829
3830 /* Clear any already-pending wakeups */
3832
3834
3835 /* Try to flush pending output to the client */
3836 if (pq_flush_if_writable() != 0)
3838 }
3839
3840 proc_exit(0);
3841 }
3844}
void EndCommandExtended(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output, bool noblock)
Definition dest.c:170
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:229
static bool shutdown_stream_done_queued
Definition walsender.c:217

References Assert, CHECK_FOR_INTERRUPTS, DestRemote, EndCommandExtended(), fb(), WalSnd::flush, GetCurrentTimestamp(), InvalidXLogRecPtr, last_reply_timestamp, MyLatch, MyWalSnd, pq_flush_if_writable, pq_is_send_pending, proc_exit(), ResetLatch(), sentPtr, SetQueryCompletion(), shutdown_stream_done_queued, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndShutdown(), WalSndWait(), WL_SOCKET_WRITEABLE, WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndDoneImmediate()

static void WalSndDoneImmediate ( void  )
static

Definition at line 3719 of file walsender.c.

3720{
3722
3723 if ((state == WALSNDSTATE_CATCHUP ||
3727 {
3728 QueryCompletion qc;
3729
3730 /* Try to inform receiver that XLOG streaming is done */
3732 EndCommandExtended(&qc, DestRemote, false, true);
3734
3735 /*
3736 * Note that the output buffer may be full during the forced shutdown
3737 * of walsender. If pq_flush() is called at that time, the walsender
3738 * process will be stuck. Therefore, call pq_flush_if_writable()
3739 * instead. Successful reception of the done message with the
3740 * walsender forced into a shutdown is not guaranteed.
3741 */
3743 }
3744
3745 /*
3746 * Prevent ereport from attempting to send any more messages to the
3747 * standby. Otherwise, it can cause the process to get stuck if the output
3748 * buffers are full.
3749 */
3752
3754 (errmsg("terminating walsender process due to replication shutdown timeout"),
3755 errdetail("Walsender process might have been terminated before all WAL data was replicated to the receiver.")));
3756
3757 proc_exit(0);
3758}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:97
@ WALSNDSTATE_STREAMING

References DestNone, DestRemote, EndCommandExtended(), ereport, errdetail(), errmsg, fb(), MyWalSnd, pq_flush_if_writable, proc_exit(), SetQueryCompletion(), shutdown_stream_done_queued, WalSnd::state, WALSNDSTATE_CATCHUP, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, WARNING, and whereToSendOutput.

Referenced by WalSndCheckShutdownTimeout().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 377 of file walsender.c.

378{
383
384 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
386
387 if (MyReplicationSlot != NULL)
389
391
392 replication_active = false;
393
394 /*
395 * If there is a transaction in progress, it will clean up our
396 * ResourceOwner, but if a replication command set up a resource owner
397 * without a transaction, we've got to clean that up now.
398 */
401
403 proc_exit(0);
404
405 /* Revert back to startup state */
407}
void pgaio_error_cleanup(void)
Definition aio.c:1175
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1866
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:868
WALOpenSegment seg
Definition xlogreader.h:271
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 4174 of file walsender.c.

4175{
4176 switch (state)
4177 {
4179 return "startup";
4180 case WALSNDSTATE_BACKUP:
4181 return "backup";
4183 return "catchup";
4185 return "streaming";
4187 return "stopping";
4188 }
4189 return "UNKNOWN";
4190}
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndHandleConfigReload()

static void WalSndHandleConfigReload ( void  )
static

Definition at line 1657 of file walsender.c.

1658{
1660 return;
1661
1662 ConfigReloadPending = false;
1665
1666 /*
1667 * Recheck and release any now-satisfied waiters after config reload
1668 * changes synchronous replication requirements (e.g., reducing the number
1669 * of sync standbys or changing the standby names).
1670 */
1673}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27

References am_cascading_walsender, ConfigReloadPending, PGC_SIGHUP, ProcessConfigFile(), SyncRepInitConfig(), and SyncRepReleaseWaiters().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 4091 of file walsender.c.

4092{
4093 int i;
4094
4095 for (i = 0; i < max_wal_senders; i++)
4096 {
4098 pid_t pid;
4099
4100 SpinLockAcquire(&walsnd->mutex);
4101 pid = walsnd->pid;
4102 SpinLockRelease(&walsnd->mutex);
4103
4104 if (pid == 0)
4105 continue;
4106
4108 }
4109}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:288
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4403 of file walsender.c.

4404{
4406
4407 /*
4408 * Don't send keepalive messages if timeouts are globally disabled or
4409 * we're doing something not partaking in timeouts.
4410 */
4412 return;
4413
4415 return;
4416
4417 /*
4418 * If half of wal_sender_timeout has lapsed without receiving any reply
4419 * from the standby, send a keep-alive message to the standby requesting
4420 * an immediate reply.
4421 */
4423 wal_sender_timeout / 2);
4425 {
4427
4428 /* Try to flush pending output to the client */
4429 if (pq_flush_if_writable() != 0)
4431 }
4432}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3228 of file walsender.c.

3229{
3231
3232 Assert(walsnd != NULL);
3233
3234 MyWalSnd = NULL;
3235
3236 SpinLockAcquire(&walsnd->mutex);
3237 /* Mark WalSnd struct as no longer being in use. */
3238 walsnd->pid = 0;
3239 SpinLockRelease(&walsnd->mutex);
3240}

References Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3938 of file walsender.c.

3939{
3940 got_SIGUSR2 = true;
3942}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 3008 of file walsender.c.

3009{
3011
3012 /*
3013 * Initialize the last reply timestamp. That enables timeout processing
3014 * from hereon.
3015 */
3018
3019 /*
3020 * Loop until we reach the end of this timeline or the client requests to
3021 * stop streaming.
3022 */
3023 for (;;)
3024 {
3025 /* Clear any already-pending wakeups */
3027
3029
3030 /* Process any requests or signals received recently */
3032
3033 /* Check for input from the client */
3035
3036 /*
3037 * If we have received CopyDone from the client, sent CopyDone
3038 * ourselves, and the output buffer is empty, it's time to exit
3039 * streaming.
3040 */
3043 break;
3044
3045 /*
3046 * If we don't have any pending data in the output buffer, try to send
3047 * some more. If there is some, we don't bother to call send_data
3048 * again until we've flushed it ... but we'd better assume we are not
3049 * caught up.
3050 */
3051 if (!pq_is_send_pending())
3052 send_data();
3053 else
3054 WalSndCaughtUp = false;
3055
3056 /* Try to flush pending output to the client */
3057 if (pq_flush_if_writable() != 0)
3059
3060 /* If nothing remains to be sent right now ... */
3062 {
3063 /*
3064 * If we're in catchup state, move to streaming. This is an
3065 * important state change for users to know about, since before
3066 * this point data loss might occur if the primary dies and we
3067 * need to failover to the standby. The state change is also
3068 * important for synchronous replication, since commits that
3069 * started to wait at that point might wait for some time.
3070 */
3072 {
3074 (errmsg_internal("\"%s\" has now caught up with upstream server",
3077 }
3078
3079 /*
3080 * When SIGUSR2 arrives, we send any outstanding logs up to the
3081 * shutdown checkpoint record (i.e., the latest record), wait for
3082 * them to be replicated to the standby, and exit. This may be a
3083 * normal termination at shutdown, or a promotion, the walsender
3084 * is not sure which.
3085 */
3086 if (got_SIGUSR2)
3088 }
3089
3090 /* Check for replication timeout. */
3092
3093 /*
3094 * During shutdown, die if the shutdown timeout expires. Call this
3095 * before WalSndComputeSleeptime() so the timeout is considered when
3096 * computing sleep time.
3097 */
3099
3100 /* Send keepalive if the time has come */
3102
3103 /*
3104 * Block if we have unsent data. XXX For logical replication, let
3105 * WalSndWaitForWal() handle any other blocking; idle receivers need
3106 * its additional actions. For physical replication, also block if
3107 * caught up; its send_data does not block.
3108 *
3109 * The IO statistics are reported in WalSndWaitForWal() for the
3110 * logical WAL senders.
3111 */
3115 {
3116 long sleeptime;
3117 int wakeEvents;
3119
3122 else
3123 wakeEvents = 0;
3124
3125 /*
3126 * Use fresh timestamp, not last_processing, to reduce the chance
3127 * of reaching wal_sender_timeout before sending a keepalive.
3128 */
3131
3132 if (pq_is_send_pending())
3134
3135 /* Report IO statistics, if needed */
3138 {
3139 pgstat_flush_io(false);
3141 last_flush = now;
3142 }
3143
3144 /* Sleep until something happens or we time out */
3146 }
3147 }
3148}
char * application_name
Definition guc_tables.c:589
bool pgstat_flush_backend(bool nowait, uint32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:107
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3770

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1585 of file walsender.c.

1586{
1587 /* can't have sync rep confused by sending the same LSN several times */
1588 if (!last_write)
1589 lsn = InvalidXLogRecPtr;
1590
1591 resetStringInfo(ctx->out);
1592
1594 pq_sendint64(ctx->out, lsn); /* dataStart */
1595 pq_sendint64(ctx->out, lsn); /* walEnd */
1596
1597 /*
1598 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1599 * reserve space here.
1600 */
1601 pq_sendint64(ctx->out, 0); /* sendtime */
1602}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3891 of file walsender.c.

3892{
3893 int i;
3894
3895 for (i = 0; i < max_wal_senders; i++)
3896 {
3898
3899 SpinLockAcquire(&walsnd->mutex);
3900 if (walsnd->pid == 0)
3901 {
3902 SpinLockRelease(&walsnd->mutex);
3903 continue;
3904 }
3905 walsnd->needreload = true;
3906 SpinLockRelease(&walsnd->mutex);
3907 }
3908}

References fb(), i, max_wal_senders, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3244 of file walsender.c.

3246{
3247 char path[MAXPGPATH];
3248
3249 /*-------
3250 * When reading from a historic timeline, and there is a timeline switch
3251 * within this segment, read from the WAL segment belonging to the new
3252 * timeline.
3253 *
3254 * For example, imagine that this server is currently on timeline 5, and
3255 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3256 * 0/13002088. In pg_wal, we have these files:
3257 *
3258 * ...
3259 * 000000040000000000000012
3260 * 000000040000000000000013
3261 * 000000050000000000000013
3262 * 000000050000000000000014
3263 * ...
3264 *
3265 * In this situation, when requested to send the WAL from segment 0x13, on
3266 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3267 * recovery prefers files from newer timelines, so if the segment was
3268 * restored from the archive on this server, the file belonging to the old
3269 * timeline, 000000040000000000000013, might not exist. Their contents are
3270 * equal up to the switchpoint, because at a timeline switch, the used
3271 * portion of the old segment is copied to the new file.
3272 */
3275 {
3277
3278 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3279 if (nextSegNo == endSegNo)
3281 }
3282
3283 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3284 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3285 if (state->seg.ws_file >= 0)
3286 return;
3287
3288 /*
3289 * If the file is not found, assume it's because the standby asked for a
3290 * too old WAL segment that has already been removed or recycled.
3291 */
3292 if (errno == ENOENT)
3293 {
3294 char xlogfname[MAXFNAMELEN];
3295 int save_errno = errno;
3296
3298 errno = save_errno;
3299 ereport(ERROR,
3301 errmsg("requested WAL segment %s has already been removed",
3302 xlogfname)));
3303 }
3304 else
3305 ereport(ERROR,
3307 errmsg("could not open file \"%s\": %m",
3308 path)));
3309}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1090
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 4155 of file walsender.c.

4156{
4158
4160
4161 if (walsnd->state == state)
4162 return;
4163
4164 SpinLockAcquire(&walsnd->mutex);
4165 walsnd->state = state;
4166 SpinLockRelease(&walsnd->mutex);
4167}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

static void WalSndShmemInit ( void arg)
static

Definition at line 3979 of file walsender.c.

3980{
3981 for (int i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3983
3984 for (int i = 0; i < max_wal_senders; i++)
3985 {
3987
3988 SpinLockInit(&walsnd->mutex);
3989 }
3990
3994}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, NUM_SYNC_REP_WAIT_MODE, SpinLockInit(), WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WalSndCtlData::walsnds.

◆ WalSndShmemRequest()

static void WalSndShmemRequest ( void arg)
static

Definition at line 3965 of file walsender.c.

3966{
3967 Size size;
3968
3969 size = offsetof(WalSndCtlData, walsnds);
3970 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3971 ShmemRequestStruct(.name = "Wal Sender Ctl",
3972 .size = size,
3973 .ptr = (void **) &WalSndCtl,
3974 );
3975}
Size add_size(Size s1, Size s2)
Definition mcxt.c:1733
Size mul_size(Size s1, Size s2)
Definition mcxt.c:1752
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References add_size(), fb(), max_wal_senders, mul_size(), name, ShmemRequestStruct, and WalSndCtl.

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 413 of file walsender.c.

414{
415 /*
416 * Reset whereToSendOutput to prevent ereport from attempting to send any
417 * more messages to the standby.
418 */
421
422 proc_exit(0);
423}

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3946 of file walsender.c.

3947{
3948 /* Set up signal handlers */
3950 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3951 pqsignal(SIGTERM, die); /* request shutdown */
3952 /* SIGQUIT handler was already set up by InitPostmasterChild */
3953 InitializeTimeouts(); /* establishes SIGALRM handler */
3956 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3957 * shutdown */
3958
3959 /* Reset some signals that are accepted by postmaster but not here */
3961}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:548
#define PG_SIG_IGN
Definition port.h:552
#define PG_SIG_DFL
Definition port.h:551
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3065
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3938
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), PG_SIG_DFL, PG_SIG_IGN, pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1736 of file walsender.c.

1738{
1739 static TimestampTz sendTime = 0;
1741 bool pending_writes = false;
1742 bool end_xact = ctx->end_xact;
1743
1744 /*
1745 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1746 * avoid flooding the lag tracker when we commit frequently.
1747 *
1748 * We don't have a mechanism to get the ack for any LSN other than end
1749 * xact LSN from the downstream. So, we track lag only for end of
1750 * transaction LSN.
1751 */
1752#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1753 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1755 {
1756 LagTrackerWrite(lsn, now);
1757 sendTime = now;
1758 }
1759
1760 /*
1761 * When skipping empty transactions in synchronous replication, we send a
1762 * keepalive message to avoid delaying such transactions.
1763 *
1764 * It is okay to check sync_standbys_status without lock here as in the
1765 * worst case we will just send an extra keepalive message when it is
1766 * really not required.
1767 */
1768 if (skipped_xact &&
1769 SyncRepRequested() &&
1770 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1771 {
1772 WalSndKeepalive(false, lsn);
1773
1774 /* Try to flush pending output to the client */
1775 if (pq_flush_if_writable() != 0)
1777
1778 /* If we have pending write here, make sure it's actually flushed */
1779 if (pq_is_send_pending())
1780 pending_writes = true;
1781 }
1782
1783 /*
1784 * Process pending writes if any or try to send a keepalive if required.
1785 * We don't need to try sending keep alive messages at the transaction end
1786 * as that will be done at a later point in time. This is required only
1787 * for large transactions where we don't send any changes to the
1788 * downstream and the receiver can timeout due to that.
1789 */
1790 if (pending_writes || (!end_xact &&
1792 wal_sender_timeout / 2)))
1794}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1680
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4441
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 4033 of file walsender.c.

4034{
4035 WaitEvent event;
4036
4038
4039 /*
4040 * We use a condition variable to efficiently wake up walsenders in
4041 * WalSndWakeup().
4042 *
4043 * Every walsender prepares to sleep on a shared memory CV. Note that it
4044 * just prepares to sleep on the CV (i.e., adds itself to the CV's
4045 * waitlist), but does not actually wait on the CV (IOW, it never calls
4046 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
4047 * waiting, because we also need to wait for socket events. The processes
4048 * (startup process, walreceiver etc.) wanting to wake up walsenders use
4049 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
4050 * walsenders come out of WaitEventSetWait().
4051 *
4052 * This approach is simple and efficient because, one doesn't have to loop
4053 * through all the walsenders slots, with a spinlock acquisition and
4054 * release for every iteration, just to wake up only the waiting
4055 * walsenders. It makes WalSndWakeup() callers' life easy.
4056 *
4057 * XXX: A desirable future improvement would be to add support for CVs
4058 * into WaitEventSetWait().
4059 *
4060 * And, we use separate shared memory CVs for physical and logical
4061 * walsenders for selective wake ups, see WalSndWakeup() for more details.
4062 *
4063 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
4064 * until awakened by physical walsenders after the walreceiver confirms
4065 * the receipt of the LSN.
4066 */
4073
4074 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
4075 (event.events & WL_POSTMASTER_DEATH))
4076 {
4078 proc_exit(1);
4079 }
4080
4082}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:66
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:167
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndDone(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1886 of file walsender.c.

1887{
1888 int wakeEvents;
1889 uint32 wait_event = 0;
1892
1893 /*
1894 * Fast path to avoid acquiring the spinlock in case we already know we
1895 * have enough WAL available and all the standby servers have confirmed
1896 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1897 * if we're far behind.
1898 */
1901 return RecentFlushPtr;
1902
1903 /*
1904 * Within the loop, we wait for the necessary WALs to be flushed to disk
1905 * first, followed by waiting for standbys to catch up if there are enough
1906 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1907 */
1908 for (;;)
1909 {
1910 bool wait_for_standby_at_stop = false;
1911 long sleeptime;
1913
1914 /* Clear any already-pending wakeups */
1916
1918
1919 /* Process any requests or signals received recently */
1921
1922 /* Check for input from the client */
1924
1925 /*
1926 * If we're shutting down, trigger pending WAL to be written out,
1927 * otherwise we'd possibly end up waiting for WAL that never gets
1928 * written, because walwriter has shut down already.
1929 *
1930 * Note that GetXLogInsertEndRecPtr() is used to obtain the WAL flush
1931 * request location instead of GetXLogInsertRecPtr(). Because if the
1932 * last WAL record ends at a page boundary, GetXLogInsertRecPtr() can
1933 * return an LSN pointing past the page header, which may cause
1934 * XLogFlush() to report an error.
1935 */
1938
1939 /*
1940 * To avoid the scenario where standbys need to catch up to a newer
1941 * WAL location in each iteration, we update our idea of the currently
1942 * flushed position only if we are not waiting for standbys to catch
1943 * up.
1944 */
1946 {
1947 if (!RecoveryInProgress())
1949 else
1951 }
1952
1953 /*
1954 * If postmaster asked us to stop and the standby slots have caught up
1955 * to the flushed position, don't wait anymore.
1956 *
1957 * It's important to do this check after the recomputation of
1958 * RecentFlushPtr, so we can send all remaining data before shutting
1959 * down.
1960 */
1961 if (got_STOPPING)
1962 {
1965 else
1966 break;
1967 }
1968
1969 /*
1970 * We only send regular messages to the client for full decoded
1971 * transactions, but a synchronous replication and walsender shutdown
1972 * possibly are waiting for a later location. So, before sleeping, we
1973 * send a ping containing the flush location. If the receiver is
1974 * otherwise idle, this keepalive will trigger a reply. Processing the
1975 * reply will update these MyWalSnd locations.
1976 */
1977 if (MyWalSnd->flush < sentPtr &&
1978 MyWalSnd->write < sentPtr &&
1981
1982 /*
1983 * Exit the loop if already caught up and doesn't need to wait for
1984 * standby slots.
1985 */
1988 break;
1989
1990 /*
1991 * Waiting for new WAL or waiting for standbys to catch up. Since we
1992 * need to wait, we're now caught up.
1993 */
1994 WalSndCaughtUp = true;
1995
1996 /*
1997 * Try to flush any pending output to the client.
1998 */
1999 if (pq_flush_if_writable() != 0)
2001
2002 /*
2003 * If we have received CopyDone from the client, sent CopyDone
2004 * ourselves, and the output buffer is empty, it's time to exit
2005 * streaming, so fail the current WAL fetch request.
2006 */
2009 break;
2010
2011 /* die if timeout was reached */
2013
2014 /*
2015 * During shutdown, die if the shutdown timeout expires. Call this
2016 * before WalSndComputeSleeptime() so the timeout is considered when
2017 * computing sleep time.
2018 */
2020
2021 /* Send keepalive if the time has come */
2023
2024 /*
2025 * Sleep until something happens or we time out. Also wait for the
2026 * socket becoming writable, if there's still pending output.
2027 * Otherwise we might sit on sendable output data while waiting for
2028 * new WAL to be generated. (But if we have nothing to send, we don't
2029 * want to wake on socket-writable.)
2030 */
2033
2035
2036 if (pq_is_send_pending())
2038
2039 Assert(wait_event != 0);
2040
2041 /* Report IO statistics, if needed */
2044 {
2045 pgstat_flush_io(false);
2047 last_flush = now;
2048 }
2049
2051 }
2052
2053 /* reactivate latch so WalSndLoop knows to continue */
2055 return RecentFlushPtr;
2056}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1858
XLogRecPtr GetXLogInsertEndRecPtr(void)
Definition xlog.c:10108
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801

References Assert, CHECK_FOR_INTERRUPTS, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogInsertEndRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckShutdownTimeout(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 4117 of file walsender.c.

4118{
4119 for (;;)
4120 {
4121 int i;
4122 bool all_stopped = true;
4123
4124 for (i = 0; i < max_wal_senders; i++)
4125 {
4127
4128 SpinLockAcquire(&walsnd->mutex);
4129
4130 if (walsnd->pid == 0)
4131 {
4132 SpinLockRelease(&walsnd->mutex);
4133 continue;
4134 }
4135
4136 if (walsnd->state != WALSNDSTATE_STOPPING)
4137 {
4138 all_stopped = false;
4139 SpinLockRelease(&walsnd->mutex);
4140 break;
4141 }
4142 SpinLockRelease(&walsnd->mutex);
4143 }
4144
4145 /* safe to leave if confirmation is done for all WAL senders */
4146 if (all_stopped)
4147 return;
4148
4149 pg_usleep(10000L); /* wait for 10 msec */
4150 }
4151}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 4012 of file walsender.c.

4013{
4014 /*
4015 * Wake up all the walsenders waiting on WAL being flushed or replayed
4016 * respectively. Note that waiting walsender would have prepared to sleep
4017 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
4018 * before actually waiting.
4019 */
4020 if (physical)
4022
4023 if (logical)
4025}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1612 of file walsender.c.

1614{
1616
1617 /*
1618 * Fill the send timestamp last, so that it is taken as late as possible.
1619 * This is somewhat ugly, but the protocol is set as it's already used for
1620 * several releases by streaming physical replication.
1621 */
1625 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1626 tmpbuf.data, sizeof(int64));
1627
1628 /* output previously gathered data in a CopyData packet */
1630
1632
1633 /* Try to flush pending output to the client */
1634 if (pq_flush_if_writable() != 0)
1636
1637 /* Try taking fast path unless we get too close to walsender timeout. */
1639 wal_sender_timeout / 2) &&
1641 {
1642 return;
1643 }
1644
1645 /* If we have pending write here, go to slow path */
1647}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, memcpy(), now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3632 of file walsender.c.

3633{
3634 XLogRecord *record;
3635 char *errm;
3636
3637 /*
3638 * We'll use the current flush point to determine whether we've caught up.
3639 * This variable is static in order to cache it across calls. Caching is
3640 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3641 * spinlock.
3642 */
3644
3645 /*
3646 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3647 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3648 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3649 * didn't wait - i.e. when we're shutting down.
3650 */
3651 WalSndCaughtUp = false;
3652
3654
3655 /* xlog record was invalid */
3656 if (errm != NULL)
3657 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3658 errm);
3659
3660 if (record != NULL)
3661 {
3662 /*
3663 * Note the lack of any call to LagTrackerWrite() which is handled by
3664 * WalSndUpdateProgress which is called by output plugin through
3665 * logical decoding write api.
3666 */
3668
3670 }
3671
3672 /*
3673 * If first time through in this session, initialize flushPtr. Otherwise,
3674 * we only need to update flushPtr if EndRecPtr is past it.
3675 */
3678 {
3679 /*
3680 * For cascading logical WAL senders, we use the replay LSN instead of
3681 * the flush LSN, since logical decoding on a standby only processes
3682 * WAL that has been replayed. This distinction becomes particularly
3683 * important during shutdown, as new WAL is no longer replayed and the
3684 * last replayed LSN marks the furthest point up to which decoding can
3685 * proceed.
3686 */
3689 else
3691 }
3692
3693 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3695 WalSndCaughtUp = true;
3696
3697 /*
3698 * If we're caught up and have been requested to stop, have WalSndLoop()
3699 * terminate the connection in an orderly manner, after writing out all
3700 * the pending data.
3701 */
3703 got_SIGUSR2 = true;
3704
3705 /* Update shared memory status */
3706 {
3708
3709 SpinLockAcquire(&walsnd->mutex);
3710 walsnd->sentPtr = sentPtr;
3711 SpinLockRelease(&walsnd->mutex);
3712 }
3713}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:89
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire(), SpinLockRelease(), WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3322 of file walsender.c.

3323{
3325 XLogRecPtr startptr;
3326 XLogRecPtr endptr;
3327 Size nbytes;
3328 XLogSegNo segno;
3330 Size rbytes;
3331
3332 /* If requested switch the WAL sender to the stopping state. */
3333 if (got_STOPPING)
3335
3337 {
3338 WalSndCaughtUp = true;
3339 return;
3340 }
3341
3342 /* Figure out how far we can safely send the WAL. */
3344 {
3345 /*
3346 * Streaming an old timeline that's in this server's history, but is
3347 * not the one we're currently inserting or replaying. It can be
3348 * streamed up to the point where we switched off that timeline.
3349 */
3351 }
3352 else if (am_cascading_walsender)
3353 {
3355
3356 /*
3357 * Streaming the latest timeline on a standby.
3358 *
3359 * Attempt to send all WAL that has already been replayed, so that we
3360 * know it's valid. If we're receiving WAL through streaming
3361 * replication, it's also OK to send any WAL that has been received
3362 * but not replayed.
3363 *
3364 * The timeline we're recovering from can change, or we can be
3365 * promoted. In either case, the current timeline becomes historic. We
3366 * need to detect that so that we don't try to stream past the point
3367 * where we switched to another timeline. We check for promotion or
3368 * timeline switch after calculating FlushPtr, to avoid a race
3369 * condition: if the timeline becomes historic just after we checked
3370 * that it was still current, it's still be OK to stream it up to the
3371 * FlushPtr that was calculated before it became historic.
3372 */
3373 bool becameHistoric = false;
3374
3376
3377 if (!RecoveryInProgress())
3378 {
3379 /* We have been promoted. */
3381 am_cascading_walsender = false;
3382 becameHistoric = true;
3383 }
3384 else
3385 {
3386 /*
3387 * Still a cascading standby. But is the timeline we're sending
3388 * still the one recovery is recovering from?
3389 */
3391 becameHistoric = true;
3392 }
3393
3394 if (becameHistoric)
3395 {
3396 /*
3397 * The timeline we were sending has become historic. Read the
3398 * timeline history file of the new timeline to see where exactly
3399 * we forked off from the timeline we were sending.
3400 */
3401 List *history;
3402
3405
3408
3410
3412 }
3413 }
3414 else
3415 {
3416 /*
3417 * Streaming the current timeline on a primary.
3418 *
3419 * Attempt to send all data that's already been written out and
3420 * fsync'd to disk. We cannot go further than what's been written out
3421 * given the current implementation of WALRead(). And in any case
3422 * it's unsafe to send WAL that is not securely down to disk on the
3423 * primary: if the primary subsequently crashes and restarts, standbys
3424 * must not have applied any WAL that got lost on the primary.
3425 */
3427 }
3428
3429 /*
3430 * Record the current system time as an approximation of the time at which
3431 * this WAL location was written for the purposes of lag tracking.
3432 *
3433 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3434 * is flushed and we could get that time as well as the LSN when we call
3435 * GetFlushRecPtr() above (and likewise for the cascading standby
3436 * equivalent), but rather than putting any new code into the hot WAL path
3437 * it seems good enough to capture the time here. We should reach this
3438 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3439 * may take some time, we read the WAL flush pointer and take the time
3440 * very close to together here so that we'll get a later position if it is
3441 * still moving.
3442 *
3443 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3444 * this gives us a cheap approximation for the WAL flush time for this
3445 * LSN.
3446 *
3447 * Note that the LSN is not necessarily the LSN for the data contained in
3448 * the present message; it's the end of the WAL, which might be further
3449 * ahead. All the lag tracking machinery cares about is finding out when
3450 * that arbitrary LSN is eventually reported as written, flushed and
3451 * applied, so that it can measure the elapsed time.
3452 */
3454
3455 /*
3456 * If this is a historic timeline and we've reached the point where we
3457 * forked to the next timeline, stop streaming.
3458 *
3459 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3460 * startup process will normally replay all WAL that has been received
3461 * from the primary, before promoting, but if the WAL streaming is
3462 * terminated at a WAL page boundary, the valid portion of the timeline
3463 * might end in the middle of a WAL record. We might've already sent the
3464 * first half of that partial WAL record to the cascading standby, so that
3465 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3466 * replay the partial WAL record either, so it can still follow our
3467 * timeline switch.
3468 */
3470 {
3471 /* close the current file. */
3472 if (xlogreader->seg.ws_file >= 0)
3474
3475 /* Send CopyDone */
3477 streamingDoneSending = true;
3478
3479 WalSndCaughtUp = true;
3480
3481 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3484 return;
3485 }
3486
3487 /* Do we have any work to do? */
3489 if (SendRqstPtr <= sentPtr)
3490 {
3491 WalSndCaughtUp = true;
3492 return;
3493 }
3494
3495 /*
3496 * Figure out how much to send in one message. If there's no more than
3497 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3498 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3499 *
3500 * The rounding is not only for performance reasons. Walreceiver relies on
3501 * the fact that we never split a WAL record across two messages. Since a
3502 * long WAL record is split at page boundary into continuation records,
3503 * page boundary is always a safe cut-off point. We also assume that
3504 * SendRqstPtr never points to the middle of a WAL record.
3505 */
3506 startptr = sentPtr;
3507 endptr = startptr;
3508 endptr += MAX_SEND_SIZE;
3509
3510 /* if we went beyond SendRqstPtr, back off */
3511 if (SendRqstPtr <= endptr)
3512 {
3513 endptr = SendRqstPtr;
3515 WalSndCaughtUp = false;
3516 else
3517 WalSndCaughtUp = true;
3518 }
3519 else
3520 {
3521 /* round down to page boundary. */
3522 endptr -= (endptr % XLOG_BLCKSZ);
3523 WalSndCaughtUp = false;
3524 }
3525
3526 nbytes = endptr - startptr;
3527 Assert(nbytes <= MAX_SEND_SIZE);
3528
3529 /*
3530 * OK to read and send the slice.
3531 */
3534
3535 pq_sendint64(&output_message, startptr); /* dataStart */
3536 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3537 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3538
3539 /*
3540 * Read the log directly into the output buffer to avoid extra memcpy
3541 * calls.
3542 */
3544
3545retry:
3546 /* attempt to read WAL from WAL buffers first */
3548 startptr, nbytes, xlogreader->seg.ws_tli);
3550 startptr += rbytes;
3551 nbytes -= rbytes;
3552
3553 /* now read the remaining WAL from WAL file */
3554 if (nbytes > 0 &&
3557 startptr,
3558 nbytes,
3559 xlogreader->seg.ws_tli, /* Pass the current TLI because
3560 * only WalSndSegmentOpen controls
3561 * whether new TLI is needed. */
3562 &errinfo))
3564
3565 /* See logical_read_xlog_page(). */
3566 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3568
3569 /*
3570 * During recovery, the currently-open WAL file might be replaced with the
3571 * file of the same name retrieved from archive. So we always need to
3572 * check what we read was valid after reading into the buffer. If it's
3573 * invalid, we try to open and read the file again.
3574 */
3576 {
3578 bool reload;
3579
3580 SpinLockAcquire(&walsnd->mutex);
3581 reload = walsnd->needreload;
3582 walsnd->needreload = false;
3583 SpinLockRelease(&walsnd->mutex);
3584
3585 if (reload && xlogreader->seg.ws_file >= 0)
3586 {
3588
3589 goto retry;
3590 }
3591 }
3592
3593 output_message.len += nbytes;
3595
3596 /*
3597 * Fill the send timestamp last, so that it is taken as late as possible.
3598 */
3601 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3602 tmpbuf.data, sizeof(int64));
3603
3605
3606 sentPtr = endptr;
3607
3608 /* Update shared memory status */
3609 {
3611
3612 SpinLockAcquire(&walsnd->mutex);
3613 walsnd->sentPtr = sentPtr;
3614 SpinLockRelease(&walsnd->mutex);
3615 }
3616
3617 /* Report progress of XLOG streaming in PS display */
3619 {
3620 char activitymsg[50];
3621
3622 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3625 }
3626}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:118
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1789

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, memcpy(), MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 138 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 279 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 243 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ shutdown_request_timestamp

TimestampTz shutdown_request_timestamp = 0
static

Definition at line 210 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ shutdown_stream_done_queued

bool shutdown_stream_done_queued = false
static

Definition at line 217 of file walsender.c.

Referenced by WalSndDone(), and WalSndDoneImmediate().

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 226 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 172 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 173 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 155 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_shutdown_timeout

int wal_sender_shutdown_timeout = -1

Definition at line 146 of file walsender.c.

Referenced by WalSndCheckShutdownTimeout(), and WalSndComputeSleeptime().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ WalSndShmemCallbacks

const ShmemCallbacks WalSndShmemCallbacks
Initial value:
= {
.request_fn = WalSndShmemRequest,
.init_fn = WalSndShmemInit,
}
static void WalSndShmemRequest(void *arg)
Definition walsender.c:3965
static void WalSndShmemInit(void *arg)
Definition walsender.c:3979

Definition at line 126 of file walsender.c.

126 {
127 .request_fn = WalSndShmemRequest,
128 .init_fn = WalSndShmemInit,
129};

◆ xlogreader