PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 226 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 114 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 103 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 258 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1415 of file walsender.c.

1416{
1417 bool failover_given = false;
1418 bool two_phase_given = false;
1419 bool failover;
1420 bool two_phase;
1421
1422 /* Parse options */
1424 {
1425 if (strcmp(defel->defname, "failover") == 0)
1426 {
1427 if (failover_given)
1428 ereport(ERROR,
1430 errmsg("conflicting or redundant options")));
1431 failover_given = true;
1433 }
1434 else if (strcmp(defel->defname, "two_phase") == 0)
1435 {
1436 if (two_phase_given)
1437 ereport(ERROR,
1439 errmsg("conflicting or redundant options")));
1440 two_phase_given = true;
1442 }
1443 else
1444 elog(ERROR, "unrecognized option: %s", defel->defname);
1445 }
1446
1450}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:949

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1194 of file walsender.c.

1195{
1196 const char *snapshot_name = NULL;
1197 char xloc[MAXFNAMELEN];
1198 char *slot_name;
1199 bool reserve_wal = false;
1200 bool two_phase = false;
1201 bool failover = false;
1205 TupleDesc tupdesc;
1206 Datum values[4];
1207 bool nulls[4] = {0};
1208
1210
1212 &failover);
1213
1214 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1215 {
1216 ReplicationSlotCreate(cmd->slotname, false,
1218 false, false, false);
1219
1220 if (reserve_wal)
1221 {
1223
1225
1226 /* Write this slot to disk if it's a permanent one. */
1227 if (!cmd->temporary)
1229 }
1230 }
1231 else
1232 {
1234 bool need_full_snapshot = false;
1235
1237
1239
1240 /*
1241 * Initially create persistent slot as ephemeral - that allows us to
1242 * nicely handle errors during initialization because it'll get
1243 * dropped if this transaction fails. We'll make it persistent at the
1244 * end. Temporary slots can be created as temporary from beginning as
1245 * they get dropped on error as well.
1246 */
1250
1251 /*
1252 * Do options check early so that we can bail before calling the
1253 * DecodingContextFindStartpoint which can take long time.
1254 */
1256 {
1257 if (IsTransactionBlock())
1258 ereport(ERROR,
1259 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1260 (errmsg("%s must not be called inside a transaction",
1261 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1262
1263 need_full_snapshot = true;
1264 }
1266 {
1267 if (!IsTransactionBlock())
1268 ereport(ERROR,
1269 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1270 (errmsg("%s must be called inside a transaction",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1272
1274 ereport(ERROR,
1275 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1276 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1277 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1278 if (!XactReadOnly)
1279 ereport(ERROR,
1280 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1281 (errmsg("%s must be called in a read-only transaction",
1282 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1283
1284 if (FirstSnapshotSet)
1285 ereport(ERROR,
1286 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1287 (errmsg("%s must be called before any query",
1288 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1289
1290 if (IsSubTransaction())
1291 ereport(ERROR,
1292 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1293 (errmsg("%s must not be called in a subtransaction",
1294 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1295
1296 need_full_snapshot = true;
1297 }
1298
1299 /*
1300 * Ensure the logical decoding is enabled before initializing the
1301 * logical decoding context.
1302 */
1305
1309 .segment_open = WalSndSegmentOpen,
1310 .segment_close = wal_segment_close),
1313
1314 /*
1315 * Signal that we don't need the timeout mechanism. We're just
1316 * creating the replication slot and don't yet accept feedback
1317 * messages or send keepalives. As we possibly need to wait for
1318 * further WAL the walsender would otherwise possibly be killed too
1319 * soon.
1320 */
1322
1323 /* build initial snapshot, might take a while */
1325
1326 /*
1327 * Export or use the snapshot if we've been asked to do so.
1328 *
1329 * NB. We will convert the snapbuild.c kind of snapshot to normal
1330 * snapshot when doing this.
1331 */
1333 {
1335 }
1337 {
1338 Snapshot snap;
1339
1342 }
1343
1344 /* don't need the decoding context anymore */
1346
1347 if (!cmd->temporary)
1349 }
1350
1351 snprintf(xloc, sizeof(xloc), "%X/%08X",
1353
1355
1356 /*----------
1357 * Need a tuple descriptor representing four columns:
1358 * - first field: the slot name
1359 * - second field: LSN at which we became consistent
1360 * - third field: exported snapshot's name
1361 * - fourth field: output plugin
1362 */
1363 tupdesc = CreateTemplateTupleDesc(4);
1364 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1365 TEXTOID, -1, 0);
1366 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1367 TEXTOID, -1, 0);
1368 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1369 TEXTOID, -1, 0);
1370 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1371 TEXTOID, -1, 0);
1372
1373 /* prepare for projection of tuples */
1375
1376 /* slot_name */
1377 slot_name = NameStr(MyReplicationSlot->data.name);
1378 values[0] = CStringGetTextDatum(slot_name);
1379
1380 /* consistent wal location */
1382
1383 /* snapshot name, or NULL if none */
1384 if (snapshot_name != NULL)
1386 else
1387 nulls[2] = true;
1388
1389 /* plugin, or NULL if none */
1390 if (cmd->plugin != NULL)
1392 else
1393 nulls[3] = true;
1394
1395 /* send it to dest */
1396 do_tup_output(tstate, values, nulls);
1398
1400}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define NameStr(name)
Definition c.h:765
#define Assert(condition)
Definition c.h:873
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:668
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:624
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:321
void CheckLogicalDecodingRequirements(void)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:204
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:305
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:378
void ReplicationSlotMarkDirty(void)
Definition slot.c:1173
void ReplicationSlotReserveWal(void)
Definition slot.c:1693
void ReplicationSlotPersist(void)
Definition slot.c:1190
ReplicationSlot * MyReplicationSlot
Definition slot.c:148
void ReplicationSlotSave(void)
Definition slot.c:1155
void ReplicationSlotRelease(void)
Definition slot.c:758
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:538
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:67
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:210
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:182
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:918
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1117
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1577
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1673
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1044
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1550
static TimestampTz last_reply_timestamp
Definition walsender.c:187
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:83
int XactIsoLevel
Definition xact.c:80
bool IsSubTransaction(void)
Definition xact.c:5066
bool IsTransactionBlock(void)
Definition xact.c:4993
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg(), ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1406 of file walsender.c.

1407{
1408 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1409}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:909

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 1994 of file walsender.c.

1995{
1996 yyscan_t scanner;
1997 int parse_rc;
1998 Node *cmd_node;
1999 const char *cmdtag;
2001
2002 /* We save and re-use the cmd_context across calls */
2004
2005 /*
2006 * If WAL sender has been told that shutdown is getting close, switch its
2007 * status accordingly to handle the next replication commands correctly.
2008 */
2009 if (got_STOPPING)
2011
2012 /*
2013 * Throw error if in stopping mode. We need prevent commands that could
2014 * generate WAL while the shutdown checkpoint is being written. To be
2015 * safe, we just prohibit all new commands.
2016 */
2018 ereport(ERROR,
2020 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2021
2022 /*
2023 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2024 * command arrives. Clean up the old stuff if there's anything.
2025 */
2027
2029
2030 /*
2031 * Prepare to parse and execute the command.
2032 *
2033 * Because replication command execution can involve beginning or ending
2034 * transactions, we need a working context that will survive that, so we
2035 * make it a child of TopMemoryContext. That in turn creates a hazard of
2036 * long-lived memory leaks if we lose track of the working context. We
2037 * deal with that by creating it only once per walsender, and resetting it
2038 * for each new command. (Normally this reset is a no-op, but if the
2039 * prior exec_replication_command call failed with an error, it won't be.)
2040 *
2041 * This is subtler than it looks. The transactions we manage can extend
2042 * across replication commands, indeed SnapBuildClearExportedSnapshot
2043 * might have just ended one. Because transaction exit will revert to the
2044 * memory context that was current at transaction start, we need to be
2045 * sure that that context is still valid. That motivates re-using the
2046 * same cmd_context rather than making a new one each time.
2047 */
2048 if (cmd_context == NULL)
2050 "Replication command context",
2052 else
2054
2056
2058
2059 /*
2060 * Is it a WalSender command?
2061 */
2063 {
2064 /* Nope; clean up and get out. */
2066
2069
2070 /* XXX this is a pretty random place to make this check */
2071 if (MyDatabaseId == InvalidOid)
2072 ereport(ERROR,
2074 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2075
2076 /* Tell the caller that this wasn't a WalSender command. */
2077 return false;
2078 }
2079
2080 /*
2081 * Looks like a WalSender command, so parse it.
2082 */
2084 if (parse_rc != 0)
2085 ereport(ERROR,
2087 errmsg_internal("replication command parser returned %d",
2088 parse_rc)));
2090
2091 /*
2092 * Report query to various monitoring facilities. For this purpose, we
2093 * report replication commands just like SQL commands.
2094 */
2096
2098
2099 /*
2100 * Log replication command if log_replication_commands is enabled. Even
2101 * when it's disabled, log the command with DEBUG1 level for backward
2102 * compatibility.
2103 */
2105 (errmsg("received replication command: %s", cmd_string)));
2106
2107 /*
2108 * Disallow replication commands in aborted transaction blocks.
2109 */
2111 ereport(ERROR,
2113 errmsg("current transaction is aborted, "
2114 "commands ignored until end of transaction block")));
2115
2117
2118 /*
2119 * Allocate buffers that will be used for each outgoing and incoming
2120 * message. We do this just once per command to reduce palloc overhead.
2121 */
2125
2126 switch (cmd_node->type)
2127 {
2129 cmdtag = "IDENTIFY_SYSTEM";
2133 break;
2134
2136 cmdtag = "READ_REPLICATION_SLOT";
2140 break;
2141
2142 case T_BaseBackupCmd:
2143 cmdtag = "BASE_BACKUP";
2148 break;
2149
2151 cmdtag = "CREATE_REPLICATION_SLOT";
2155 break;
2156
2158 cmdtag = "DROP_REPLICATION_SLOT";
2162 break;
2163
2165 cmdtag = "ALTER_REPLICATION_SLOT";
2169 break;
2170
2172 {
2174
2175 cmdtag = "START_REPLICATION";
2178
2179 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2180 StartReplication(cmd);
2181 else
2183
2184 /* dupe, but necessary per libpqrcv_endstreaming */
2186
2188 break;
2189 }
2190
2192 cmdtag = "TIMELINE_HISTORY";
2197 break;
2198
2199 case T_VariableShowStmt:
2200 {
2203
2204 cmdtag = "SHOW";
2206
2207 /* syscache access needs a transaction environment */
2209 GetPGVariable(n->name, dest);
2212 }
2213 break;
2214
2216 cmdtag = "UPLOAD_MANIFEST";
2221 break;
2222
2223 default:
2224 elog(ERROR, "unrecognized replication command node tag: %u",
2225 cmd_node->type);
2226 }
2227
2228 /*
2229 * Done. Revert to caller's memory context, and clean out the cmd_context
2230 * to recover memory right away.
2231 */
2234
2235 /*
2236 * We need not update ps display or pg_stat_activity, because PostgresMain
2237 * will reset those to "idle". But we must reset debug_query_string to
2238 * ensure it doesn't become a dangling pointer.
2239 */
2241
2242 return true;
2243}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:990
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:205
int errmsg_internal(const char *fmt,...)
Definition elog.c:1170
#define LOG
Definition elog.h:31
#define DEBUG1
Definition elog.h:30
Oid MyDatabaseId
Definition globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:408
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const char * debug_query_string
Definition postgres.c:89
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:599
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1415
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:580
WalSnd * MyWalSnd
Definition walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:481
static StringInfoData tmpbuf
Definition walsender.c:178
static void IdentifySystem(void)
Definition walsender.c:400
static StringInfoData reply_message
Definition walsender.c:177
void WalSndSetState(WalSndState state)
Definition walsender.c:3942
static StringInfoData output_message
Definition walsender.c:176
static void UploadManifest(void)
Definition walsender.c:670
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:206
bool log_replication_commands
Definition walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1194
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1457
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1406
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:812
static XLogReaderState * xlogreader
Definition walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3669
void StartTransactionCommand(void)
Definition xact.c:3080
bool IsAbortedTransactionBlockState(void)
Definition xact.c:408
void CommitTransactionCommand(void)
Definition xact.c:3178

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3637 of file walsender.c.

3638{
3640 TimeLineID replayTLI;
3643 XLogRecPtr result;
3644
3646
3647 /*
3648 * We can safely send what's already been replayed. Also, if walreceiver
3649 * is streaming WAL from the same timeline, we can send anything that it
3650 * has streamed, but hasn't been replayed yet.
3651 */
3652
3654 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3655
3656 if (tli)
3657 *tli = replayTLI;
3658
3659 result = replayPtr;
3660 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3661 result = receivePtr;
3662
3663 return result;
3664}
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1883
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:124
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 736 of file walsender.c.

738{
739 int mtype;
740 int maxmsglen;
741
743
745 mtype = pq_getbyte();
746 if (mtype == EOF)
749 errmsg("unexpected EOF on client connection with an open transaction")));
750
751 switch (mtype)
752 {
753 case PqMsg_CopyData:
755 break;
756 case PqMsg_CopyDone:
757 case PqMsg_CopyFail:
758 case PqMsg_Flush:
759 case PqMsg_Sync:
761 break;
762 default:
765 errmsg("unexpected message type 0x%02X during COPY from stdin",
766 mtype)));
767 maxmsglen = 0; /* keep compiler quiet */
768 break;
769 }
770
771 /* Now collect the message body */
775 errmsg("unexpected EOF on client connection with an open transaction")));
777
778 /* Process the message */
779 switch (mtype)
780 {
781 case PqMsg_CopyData:
783 return true;
784
785 case PqMsg_CopyDone:
786 return false;
787
788 case PqMsg_Sync:
789 case PqMsg_Flush:
790 /* Ignore these while in CopyOut mode as we do elsewhere. */
791 return true;
792
793 case PqMsg_CopyFail:
796 errmsg("COPY from stdin failed: %s",
798 }
799
800 /* Not reached. */
801 Assert(false);
802 return false;
803}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1203
int pq_getbyte(void)
Definition pqcomm.c:963
void pq_startmsgread(void)
Definition pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), errmsg(), ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3693 of file walsender.c.

3694{
3696
3697 /*
3698 * If replication has not yet started, die like with SIGTERM. If
3699 * replication is active, only set a flag and wake up the main loop. It
3700 * will send any outstanding WAL, wait for it to be replicated to the
3701 * standby, and then exit gracefully.
3702 */
3703 if (!replication_active)
3705 else
3706 got_STOPPING = true;
3707}
int MyProcPid
Definition globals.c:47
bool am_walsender
Definition walsender.c:123
static volatile sig_atomic_t replication_active
Definition walsender.c:214
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 400 of file walsender.c.

401{
402 char sysid[32];
403 char xloc[MAXFNAMELEN];
405 char *dbname = NULL;
408 TupleDesc tupdesc;
409 Datum values[4];
410 bool nulls[4] = {0};
411 TimeLineID currTLI;
412
413 /*
414 * Reply with a result set with one row, four columns. First col is system
415 * ID, second is timeline ID, third is current xlog location and the
416 * fourth contains the database name if we are connected to one.
417 */
418
421
424 logptr = GetStandbyFlushRecPtr(&currTLI);
425 else
426 logptr = GetFlushRecPtr(&currTLI);
427
428 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
429
431 {
433
434 /* syscache access needs a transaction env. */
437 /* copy dbname out of TX context */
440 }
441
443
444 /* need a tuple descriptor representing four columns */
445 tupdesc = CreateTemplateTupleDesc(4);
446 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
447 TEXTOID, -1, 0);
448 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
449 INT8OID, -1, 0);
450 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
451 TEXTOID, -1, 0);
452 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
453 TEXTOID, -1, 0);
454
455 /* prepare for projection of tuples */
457
458 /* column 1: system identifier */
460
461 /* column 2: timeline */
462 values[1] = Int64GetDatum(currTLI);
463
464 /* column 3: wal location */
466
467 /* column 4: database name, or NULL if none */
468 if (dbname)
470 else
471 nulls[3] = true;
472
473 /* send it to dest */
474 do_tup_output(tstate, values, nulls);
475
477}
#define UINT64_FORMAT
Definition c.h:565
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1242
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
static Datum Int64GetDatum(int64 X)
Definition postgres.h:423
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3637
uint64 GetSystemIdentifier(void)
Definition xlog.c:4627
bool RecoveryInProgress(void)
Definition xlog.c:6460
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6625

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PROC_HDR * ProcGlobal
Definition proc.c:79
TransactionId xmin
Definition proc.h:194
uint8 statusFlags
Definition proc.h:265
int pgxactoff
Definition proc.h:201
uint8 * statusFlags
Definition proc.h:409
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3021
static LagTracker * lag_tracker
Definition walsender.c:252

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3021 of file walsender.c.

3022{
3023 int i;
3024
3025 /*
3026 * WalSndCtl should be set up already (we inherit this by fork() or
3027 * EXEC_BACKEND mechanism from the postmaster).
3028 */
3029 Assert(WalSndCtl != NULL);
3030 Assert(MyWalSnd == NULL);
3031
3032 /*
3033 * Find a free walsender slot and reserve it. This must not fail due to
3034 * the prior check for free WAL senders in InitProcess().
3035 */
3036 for (i = 0; i < max_wal_senders; i++)
3037 {
3039
3040 SpinLockAcquire(&walsnd->mutex);
3041
3042 if (walsnd->pid != 0)
3043 {
3044 SpinLockRelease(&walsnd->mutex);
3045 continue;
3046 }
3047 else
3048 {
3049 /*
3050 * Found a free slot. Reserve it for us.
3051 */
3052 walsnd->pid = MyProcPid;
3053 walsnd->state = WALSNDSTATE_STARTUP;
3054 walsnd->sentPtr = InvalidXLogRecPtr;
3055 walsnd->needreload = false;
3056 walsnd->write = InvalidXLogRecPtr;
3057 walsnd->flush = InvalidXLogRecPtr;
3058 walsnd->apply = InvalidXLogRecPtr;
3059 walsnd->writeLag = -1;
3060 walsnd->flushLag = -1;
3061 walsnd->applyLag = -1;
3062 walsnd->sync_standby_priority = 0;
3063 walsnd->replyTime = 0;
3064
3065 /*
3066 * The kind assignment is done here and not in StartReplication()
3067 * and StartLogicalReplication(). Indeed, the logical walsender
3068 * needs to read WAL records (like snapshot of running
3069 * transactions) during the slot creation. So it needs to be woken
3070 * up based on its kind.
3071 *
3072 * The kind assignment could also be done in StartReplication(),
3073 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3074 * seems better to set it on one place.
3075 */
3076 if (MyDatabaseId == InvalidOid)
3078 else
3080
3081 SpinLockRelease(&walsnd->mutex);
3082 /* don't need the lock anymore */
3083 MyWalSnd = walsnd;
3084
3085 break;
3086 }
3087 }
3088
3089 Assert(MyWalSnd != NULL);
3090
3091 /* Arrange to clean up at walsender exit */
3093}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:129
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3097
WalSndCtlData * WalSndCtl
Definition walsender.c:117
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire, SpinLockRelease, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4286 of file walsender.c.

4287{
4288 TimestampTz time = 0;
4289
4290 /*
4291 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4292 * return the elapsed time (in microseconds) since the saved local flush
4293 * time. If the flush time is in the future (due to clock drift), return
4294 * -1 to treat as no valid sample.
4295 *
4296 * Otherwise, switch back to using the buffer to control the read head and
4297 * compute the elapsed time. The read head is then reset to point to the
4298 * oldest entry in the buffer.
4299 */
4300 if (lag_tracker->read_heads[head] == -1)
4301 {
4302 if (lag_tracker->overflowed[head].lsn > lsn)
4303 return (now >= lag_tracker->overflowed[head].time) ?
4304 now - lag_tracker->overflowed[head].time : -1;
4305
4306 time = lag_tracker->overflowed[head].time;
4308 lag_tracker->read_heads[head] =
4310 }
4311
4312 /* Read all unread samples up to this LSN or end of buffer. */
4313 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4315 {
4317 lag_tracker->last_read[head] =
4319 lag_tracker->read_heads[head] =
4321 }
4322
4323 /*
4324 * If the lag tracker is empty, that means the standby has processed
4325 * everything we've ever sent so we should now clear 'last_read'. If we
4326 * didn't do that, we'd risk using a stale and irrelevant sample for
4327 * interpolation at the beginning of the next burst of WAL after a period
4328 * of idleness.
4329 */
4331 lag_tracker->last_read[head].time = 0;
4332
4333 if (time > now)
4334 {
4335 /* If the clock somehow went backwards, treat as not found. */
4336 return -1;
4337 }
4338 else if (time == 0)
4339 {
4340 /*
4341 * We didn't cross a time. If there is a future sample that we
4342 * haven't reached yet, and we've already reached at least one sample,
4343 * let's interpolate the local flushed time. This is mainly useful
4344 * for reporting a completely stuck apply position as having
4345 * increasing lag, since otherwise we'd have to wait for it to
4346 * eventually start moving again and cross one of our samples before
4347 * we can show the lag increasing.
4348 */
4350 {
4351 /* There are no future samples, so we can't interpolate. */
4352 return -1;
4353 }
4354 else if (lag_tracker->last_read[head].time != 0)
4355 {
4356 /* We can interpolate between last_read and the next sample. */
4357 double fraction;
4358 WalTimeSample prev = lag_tracker->last_read[head];
4360
4361 if (lsn < prev.lsn)
4362 {
4363 /*
4364 * Reported LSNs shouldn't normally go backwards, but it's
4365 * possible when there is a timeline change. Treat as not
4366 * found.
4367 */
4368 return -1;
4369 }
4370
4371 Assert(prev.lsn < next.lsn);
4372
4373 if (prev.time > next.time)
4374 {
4375 /* If the clock somehow went backwards, treat as not found. */
4376 return -1;
4377 }
4378
4379 /* See how far we are between the previous and next samples. */
4380 fraction =
4381 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4382
4383 /* Scale the local flush time proportionally. */
4384 time = (TimestampTz)
4385 ((double) prev.time + (next.time - prev.time) * fraction);
4386 }
4387 else
4388 {
4389 /*
4390 * We have only a future sample, implying that we were entirely
4391 * caught up but and now there is a new burst of WAL and the
4392 * standby hasn't processed the first sample yet. Until the
4393 * standby reaches the future sample the best we can do is report
4394 * the hypothetical lag if that sample were to be replayed now.
4395 */
4397 }
4398 }
4399
4400 /* Return the elapsed time since local flush time in microseconds. */
4401 Assert(time != 0);
4402 return now - time;
4403}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:232
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:234
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:235
int write_head
Definition walsender.c:233
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:249
TimestampTz time
Definition walsender.c:222
XLogRecPtr lsn
Definition walsender.c:221
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:226

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4228 of file walsender.c.

4229{
4230 int new_write_head;
4231 int i;
4232
4233 if (!am_walsender)
4234 return;
4235
4236 /*
4237 * If the lsn hasn't advanced since last time, then do nothing. This way
4238 * we only record a new sample when new WAL has been written.
4239 */
4240 if (lag_tracker->last_lsn == lsn)
4241 return;
4242 lag_tracker->last_lsn = lsn;
4243
4244 /*
4245 * If advancing the write head of the circular buffer would crash into any
4246 * of the read heads, then the buffer is full. In other words, the
4247 * slowest reader (presumably apply) is the one that controls the release
4248 * of space.
4249 */
4251 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4252 {
4253 /*
4254 * If the buffer is full, move the slowest reader to a separate
4255 * overflow entry and free its space in the buffer so the write head
4256 * can advance.
4257 */
4259 {
4262 lag_tracker->read_heads[i] = -1;
4263 }
4264 }
4265
4266 /* Store a sample at the current write head position. */
4270}
XLogRecPtr last_lsn
Definition walsender.c:231
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1044 of file walsender.c.

1046{
1048 int count;
1050 XLogSegNo segno;
1051 TimeLineID currTLI;
1052
1053 /*
1054 * Make sure we have enough WAL available before retrieving the current
1055 * timeline.
1056 */
1058
1059 /* Fail if not enough (implies we are going to shut down) */
1061 return -1;
1062
1063 /*
1064 * Since logical decoding is also permitted on a standby server, we need
1065 * to check if the server is in recovery to decide how to get the current
1066 * timeline ID (so that it also covers the promotion or timeline change
1067 * cases). We must determine am_cascading_walsender after waiting for the
1068 * required WAL so that it is correct when the walsender wakes up after a
1069 * promotion.
1070 */
1072
1074 GetXLogReplayRecPtr(&currTLI);
1075 else
1076 currTLI = GetWALInsertionTimeLine();
1077
1079 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1080 sendTimeLine = state->currTLI;
1081 sendTimeLineValidUpto = state->currTLIValidUntil;
1082 sendTimeLineNextTLI = state->nextTLI;
1083
1085 count = XLOG_BLCKSZ; /* more than one block available */
1086 else
1087 count = flushptr - targetPagePtr; /* part of the page available */
1088
1089 /* now actually read the data, we know it's there */
1090 if (!WALRead(state,
1091 cur_page,
1093 count,
1094 currTLI, /* Pass the current TLI because only
1095 * WalSndSegmentOpen controls whether new TLI
1096 * is needed. */
1097 &errinfo))
1099
1100 /*
1101 * After reading into the buffer, check that what we read was valid. We do
1102 * this after reading, because even though the segment was present when we
1103 * opened it, it might get recycled or removed while we read it. The
1104 * read() succeeds in that case, but the data we tried to read might
1105 * already have been overwritten with new WAL records.
1106 */
1107 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1108 CheckXLogRemoved(segno, state->seg.ws_tli);
1109
1110 return count;
1111}
static TimeLineID sendTimeLine
Definition walsender.c:164
static bool sendTimeLineIsHistoric
Definition walsender.c:166
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1823
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:165
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:167
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:6646
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3764
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1763 of file walsender.c.

1764{
1765 int elevel = got_STOPPING ? ERROR : WARNING;
1766 bool failover_slot;
1767
1769
1770 /*
1771 * Note that after receiving the shutdown signal, an ERROR is reported if
1772 * any slots are dropped, invalidated, or inactive. This measure is taken
1773 * to prevent the walsender from waiting indefinitely.
1774 */
1776 {
1778 return true;
1779 }
1780
1781 *wait_event = 0;
1782 return false;
1783}
#define WARNING
Definition elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3083

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1795 of file walsender.c.

1797{
1798 /* Check if we need to wait for WALs to be flushed to disk */
1799 if (target_lsn > flushed_lsn)
1800 {
1802 return true;
1803 }
1804
1805 /* Check if the standby slots have caught up to the flushed position */
1807}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1763

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3980 of file walsender.c.

3981{
3982 Interval *result = palloc_object(Interval);
3983
3984 result->month = 0;
3985 result->day = 0;
3986 result->time = offset;
3987
3988 return result;
3989}
#define palloc_object(type)
Definition fe_memutils.h:74
int32 day
Definition timestamp.h:51
int32 month
Definition timestamp.h:52
TimeOffset time
Definition timestamp.h:49

References Interval::day, Interval::month, palloc_object, and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1117 of file walsender.c.

1121{
1122 ListCell *lc;
1123 bool snapshot_action_given = false;
1124 bool reserve_wal_given = false;
1125 bool two_phase_given = false;
1126 bool failover_given = false;
1127
1128 /* Parse options */
1129 foreach(lc, cmd->options)
1130 {
1131 DefElem *defel = (DefElem *) lfirst(lc);
1132
1133 if (strcmp(defel->defname, "snapshot") == 0)
1134 {
1135 char *action;
1136
1138 ereport(ERROR,
1140 errmsg("conflicting or redundant options")));
1141
1143 snapshot_action_given = true;
1144
1145 if (strcmp(action, "export") == 0)
1147 else if (strcmp(action, "nothing") == 0)
1149 else if (strcmp(action, "use") == 0)
1151 else
1152 ereport(ERROR,
1154 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1155 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1156 }
1157 else if (strcmp(defel->defname, "reserve_wal") == 0)
1158 {
1160 ereport(ERROR,
1162 errmsg("conflicting or redundant options")));
1163
1164 reserve_wal_given = true;
1166 }
1167 else if (strcmp(defel->defname, "two_phase") == 0)
1168 {
1170 ereport(ERROR,
1172 errmsg("conflicting or redundant options")));
1173 two_phase_given = true;
1175 }
1176 else if (strcmp(defel->defname, "failover") == 0)
1177 {
1179 ereport(ERROR,
1181 errmsg("conflicting or redundant options")));
1182 failover_given = true;
1184 }
1185 else
1186 elog(ERROR, "unrecognized option: %s", defel->defname);
1187 }
1188}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg(), ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3996 of file walsender.c.

3997{
3998#define PG_STAT_GET_WAL_SENDERS_COLS 12
3999 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4001 int num_standbys;
4002 int i;
4003
4004 InitMaterializedSRF(fcinfo, 0);
4005
4006 /*
4007 * Get the currently active synchronous standbys. This could be out of
4008 * date before we're done, but we'll use the data anyway.
4009 */
4011
4012 for (i = 0; i < max_wal_senders; i++)
4013 {
4017 XLogRecPtr flush;
4018 XLogRecPtr apply;
4019 TimeOffset writeLag;
4020 TimeOffset flushLag;
4021 TimeOffset applyLag;
4022 int priority;
4023 int pid;
4025 TimestampTz replyTime;
4026 bool is_sync_standby;
4028 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4029 int j;
4030
4031 /* Collect data from shared memory */
4032 SpinLockAcquire(&walsnd->mutex);
4033 if (walsnd->pid == 0)
4034 {
4035 SpinLockRelease(&walsnd->mutex);
4036 continue;
4037 }
4038 pid = walsnd->pid;
4039 sent_ptr = walsnd->sentPtr;
4040 state = walsnd->state;
4041 write = walsnd->write;
4042 flush = walsnd->flush;
4043 apply = walsnd->apply;
4044 writeLag = walsnd->writeLag;
4045 flushLag = walsnd->flushLag;
4046 applyLag = walsnd->applyLag;
4047 priority = walsnd->sync_standby_priority;
4048 replyTime = walsnd->replyTime;
4049 SpinLockRelease(&walsnd->mutex);
4050
4051 /*
4052 * Detect whether walsender is/was considered synchronous. We can
4053 * provide some protection against stale data by checking the PID
4054 * along with walsnd_index.
4055 */
4056 is_sync_standby = false;
4057 for (j = 0; j < num_standbys; j++)
4058 {
4059 if (sync_standbys[j].walsnd_index == i &&
4060 sync_standbys[j].pid == pid)
4061 {
4062 is_sync_standby = true;
4063 break;
4064 }
4065 }
4066
4067 values[0] = Int32GetDatum(pid);
4068
4070 {
4071 /*
4072 * Only superusers and roles with privileges of pg_read_all_stats
4073 * can see details. Other users only get the pid value to know
4074 * it's a walsender, but no details.
4075 */
4076 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4077 }
4078 else
4079 {
4081
4083 nulls[2] = true;
4085
4087 nulls[3] = true;
4088 values[3] = LSNGetDatum(write);
4089
4090 if (!XLogRecPtrIsValid(flush))
4091 nulls[4] = true;
4092 values[4] = LSNGetDatum(flush);
4093
4094 if (!XLogRecPtrIsValid(apply))
4095 nulls[5] = true;
4096 values[5] = LSNGetDatum(apply);
4097
4098 /*
4099 * Treat a standby such as a pg_basebackup background process
4100 * which always returns an invalid flush location, as an
4101 * asynchronous standby.
4102 */
4103 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4104
4105 if (writeLag < 0)
4106 nulls[6] = true;
4107 else
4109
4110 if (flushLag < 0)
4111 nulls[7] = true;
4112 else
4114
4115 if (applyLag < 0)
4116 nulls[8] = true;
4117 else
4119
4121
4122 /*
4123 * More easily understood version of standby state. This is purely
4124 * informational.
4125 *
4126 * In quorum-based sync replication, the role of each standby
4127 * listed in synchronous_standby_names can be changing very
4128 * frequently. Any standbys considered as "sync" at one moment can
4129 * be switched to "potential" ones at the next moment. So, it's
4130 * basically useless to report "sync" or "potential" as their sync
4131 * states. We report just "quorum" for them.
4132 */
4133 if (priority == 0)
4134 values[10] = CStringGetTextDatum("async");
4135 else if (is_sync_standby)
4137 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4138 else
4139 values[10] = CStringGetTextDatum("potential");
4140
4141 if (replyTime == 0)
4142 nulls[11] = true;
4143 else
4144 values[11] = TimestampTzGetDatum(replyTime);
4145 }
4146
4147 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4148 values, nulls);
4149 }
4150
4151 return (Datum) 0;
4152}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5284
#define MemSet(start, val, len)
Definition c.h:1013
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:469
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:754
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:3980
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:3961
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire, SpinLockRelease, SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2401 of file walsender.c.

2402{
2403 bool changed = false;
2405
2407 SpinLockAcquire(&slot->mutex);
2408 if (slot->data.restart_lsn != lsn)
2409 {
2410 changed = true;
2411 slot->data.restart_lsn = lsn;
2412 }
2413 SpinLockRelease(&slot->mutex);
2414
2415 if (changed)
2416 {
2420 }
2421
2422 /*
2423 * One could argue that the slot should be saved to disk now, but that'd
2424 * be energy wasted - the worst thing lost information could cause here is
2425 * to give wrong information in a statistics view - we'll just potentially
2426 * be more conservative in removing files.
2427 */
2428}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1297
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1738

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2539 of file walsender.c.

2540{
2541 bool changed = false;
2543
2544 SpinLockAcquire(&slot->mutex);
2546
2547 /*
2548 * For physical replication we don't need the interlock provided by xmin
2549 * and effective_xmin since the consequences of a missed increase are
2550 * limited to query cancellations, so set both at once.
2551 */
2552 if (!TransactionIdIsNormal(slot->data.xmin) ||
2555 {
2556 changed = true;
2557 slot->data.xmin = feedbackXmin;
2559 }
2563 {
2564 changed = true;
2567 }
2568 SpinLockRelease(&slot->mutex);
2569
2570 if (changed)
2571 {
2574 }
2575}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1215
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:207
TransactionId effective_xmin
Definition slot.h:206
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1738 of file walsender.c.

1739{
1741
1742 /*
1743 * If we are running in a standby, there is no need to wake up walsenders.
1744 * This is because we do not support syncing slots to cascading standbys,
1745 * so, there are no walsenders waiting for standbys to catch up.
1746 */
1747 if (RecoveryInProgress())
1748 return;
1749
1752}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3050
#define SlotIsPhysical(slot)
Definition slot.h:284
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1619 of file walsender.c.

1620{
1621 for (;;)
1622 {
1623 long sleeptime;
1624
1625 /* Check for input from the client */
1627
1628 /* die if timeout was reached */
1630
1631 /* Send keepalive if the time has come */
1633
1634 if (!pq_is_send_pending())
1635 break;
1636
1638
1639 /* Sleep until something happens or we time out */
1642
1643 /* Clear any already-pending wakeups */
1645
1647
1648 /* Process any requests or signals received recently */
1650 {
1651 ConfigReloadPending = false;
1654 }
1655
1656 /* Try to flush pending output to the client */
1657 if (pq_flush_if_writable() != 0)
1659 }
1660
1661 /* reactivate latch so WalSndLoop knows to continue */
1663}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:47
#define pq_is_send_pending()
Definition libpq.h:48
void SyncRepInitConfig(void)
Definition syncrep.c:445
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:3820
static void WalSndCheckTimeOut(void)
Definition walsender.c:2852
static void ProcessRepliesIfAny(void)
Definition walsender.c:2250
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4190
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:384
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2808

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, fb(), GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2250 of file walsender.c.

2251{
2252 unsigned char firstchar;
2253 int maxmsglen;
2254 int r;
2255 bool received = false;
2256
2258
2259 /*
2260 * If we already received a CopyDone from the frontend, any subsequent
2261 * message is the beginning of a new command, and should be processed in
2262 * the main processing loop.
2263 */
2264 while (!streamingDoneReceiving)
2265 {
2268 if (r < 0)
2269 {
2270 /* unexpected error or EOF */
2273 errmsg("unexpected EOF on standby connection")));
2274 proc_exit(0);
2275 }
2276 if (r == 0)
2277 {
2278 /* no data available without blocking */
2279 pq_endmsgread();
2280 break;
2281 }
2282
2283 /* Validate message type and set packet size limit */
2284 switch (firstchar)
2285 {
2286 case PqMsg_CopyData:
2288 break;
2289 case PqMsg_CopyDone:
2290 case PqMsg_Terminate:
2292 break;
2293 default:
2294 ereport(FATAL,
2296 errmsg("invalid standby message type \"%c\"",
2297 firstchar)));
2298 maxmsglen = 0; /* keep compiler quiet */
2299 break;
2300 }
2301
2302 /* Read the message contents */
2305 {
2308 errmsg("unexpected EOF on standby connection")));
2309 proc_exit(0);
2310 }
2311
2312 /* ... and process it */
2313 switch (firstchar)
2314 {
2315 /*
2316 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2317 * packet.
2318 */
2319 case PqMsg_CopyData:
2321 received = true;
2322 break;
2323
2324 /*
2325 * PqMsg_CopyDone means the standby requested to finish
2326 * streaming. Reply with CopyDone, if we had not sent that
2327 * already.
2328 */
2329 case PqMsg_CopyDone:
2331 {
2333 streamingDoneSending = true;
2334 }
2335
2337 received = true;
2338 break;
2339
2340 /*
2341 * PqMsg_Terminate means that the standby is closing down the
2342 * socket.
2343 */
2344 case PqMsg_Terminate:
2345 proc_exit(0);
2346
2347 default:
2348 Assert(false); /* NOT REACHED */
2349 }
2350 }
2351
2352 /*
2353 * Save the last reply timestamp if we've received at least one reply.
2354 */
2355 if (received)
2356 {
2359 }
2360}
#define COMMERROR
Definition elog.h:33
#define FATAL
Definition elog.h:41
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1003
void pq_endmsgread(void)
Definition pqcomm.c:1165
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:190
static TimestampTz last_processing
Definition walsender.c:181
static bool streamingDoneSending
Definition walsender.c:198
static void ProcessStandbyMessage(void)
Definition walsender.c:2366
static bool streamingDoneReceiving
Definition walsender.c:199

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2619 of file walsender.c.

2620{
2625 TimestampTz replyTime;
2626
2627 /*
2628 * Decipher the reply message. The caller already consumed the msgtype
2629 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2630 * of this message.
2631 */
2632 replyTime = pq_getmsgint64(&reply_message);
2637
2639 {
2640 char *replyTimeStr;
2641
2642 /* Copy because timestamptz_to_str returns a static buffer */
2644
2645 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2650 replyTimeStr);
2651
2653 }
2654
2655 /*
2656 * Update shared state for this WalSender process based on reply data from
2657 * standby.
2658 */
2659 {
2661
2662 SpinLockAcquire(&walsnd->mutex);
2663 walsnd->replyTime = replyTime;
2664 SpinLockRelease(&walsnd->mutex);
2665 }
2666
2667 /*
2668 * Unset WalSender's xmins if the feedback message values are invalid.
2669 * This happens when the downstream turned hot_standby_feedback off.
2670 */
2673 {
2675 if (MyReplicationSlot != NULL)
2677 return;
2678 }
2679
2680 /*
2681 * Check that the provided xmin/epoch are sane, that is, not in the future
2682 * and not so far back as to be already wrapped around. Ignore if not.
2683 */
2686 return;
2687
2690 return;
2691
2692 /*
2693 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2694 * the xmin will be taken into account by GetSnapshotData() /
2695 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2696 * thereby prevent the generation of cleanup conflicts on the standby
2697 * server.
2698 *
2699 * There is a small window for a race condition here: although we just
2700 * checked that feedbackXmin precedes nextXid, the nextXid could have
2701 * gotten advanced between our fetching it and applying the xmin below,
2702 * perhaps far enough to make feedbackXmin wrap around. In that case the
2703 * xmin we set here would be "in the future" and have no effect. No point
2704 * in worrying about this since it's too late to save the desired data
2705 * anyway. Assuming that the standby sends us an increasing sequence of
2706 * xmins, this could only happen during the first reply cycle, else our
2707 * own xmin would prevent nextXid from advancing so far.
2708 *
2709 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2710 * is assumed atomic, and there's no real need to prevent concurrent
2711 * horizon determinations. (If we're moving our xmin forward, this is
2712 * obviously safe, and if we're moving it backwards, well, the data is at
2713 * risk already since a VACUUM could already have determined the horizon.)
2714 *
2715 * If we're using a replication slot we reserve the xmin via that,
2716 * otherwise via the walsender's PGPROC entry. We can only track the
2717 * catalog xmin separately when using a slot, so we store the least of the
2718 * two provided when not using a slot.
2719 *
2720 * XXX: It might make sense to generalize the ephemeral slot concept and
2721 * always use the slot mechanism to handle the feedback xmin.
2722 */
2723 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2725 else
2726 {
2730 else
2732 }
2733}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1862
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
bool message_level_is_interesting(int elevel)
Definition elog.c:273
#define DEBUG2
Definition elog.h:29
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2539
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2588

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2366 of file walsender.c.

2367{
2368 char msgtype;
2369
2370 /*
2371 * Check message type from the first byte.
2372 */
2374
2375 switch (msgtype)
2376 {
2379 break;
2380
2383 break;
2384
2387 break;
2388
2389 default:
2392 errmsg("unexpected message type \"%c\"", msgtype)));
2393 proc_exit(0);
2394 }
2395}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2619
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2739
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2434

References COMMERROR, ereport, errcode(), errmsg(), fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2739 of file walsender.c.

2740{
2747 TimestampTz replyTime;
2748
2749 /*
2750 * This shouldn't happen because we don't support getting primary status
2751 * message from standby.
2752 */
2753 if (RecoveryInProgress())
2754 elog(ERROR, "the primary status is unavailable during recovery");
2755
2756 replyTime = pq_getmsgint64(&reply_message);
2757
2758 /*
2759 * Update shared state for this WalSender process based on reply data from
2760 * standby.
2761 */
2762 SpinLockAcquire(&walsnd->mutex);
2763 walsnd->replyTime = replyTime;
2764 SpinLockRelease(&walsnd->mutex);
2765
2766 /*
2767 * Consider transactions in the current database, as only these are the
2768 * ones replicated.
2769 */
2772
2773 /*
2774 * Update the oldest xid for standby transmission if an older prepared
2775 * transaction exists and is currently in commit phase.
2776 */
2780
2784 lsn = GetXLogWriteRecPtr();
2785
2786 elog(DEBUG2, "sending primary status");
2787
2788 /* construct the message... */
2795
2796 /* ... and send it wrapped in CopyData */
2798}
int64_t int64
Definition c.h:543
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2835
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:443
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2829
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:288
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:9614

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2434 of file walsender.c.

2435{
2437 flushPtr,
2438 applyPtr;
2439 bool replyRequested;
2440 TimeOffset writeLag,
2441 flushLag,
2442 applyLag;
2443 bool clearLagTimes;
2445 TimestampTz replyTime;
2446
2447 static bool fullyAppliedLastTime = false;
2448
2449 /* the caller already consumed the msgtype byte */
2453 replyTime = pq_getmsgint64(&reply_message);
2455
2457 {
2458 char *replyTimeStr;
2459
2460 /* Copy because timestamptz_to_str returns a static buffer */
2462
2463 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2467 replyRequested ? " (reply requested)" : "",
2468 replyTimeStr);
2469
2471 }
2472
2473 /* See if we can compute the round-trip lag for these positions. */
2478
2479 /*
2480 * If the standby reports that it has fully replayed the WAL in two
2481 * consecutive reply messages, then the second such message must result
2482 * from wal_receiver_status_interval expiring on the standby. This is a
2483 * convenient time to forget the lag times measured when it last
2484 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2485 * until more WAL traffic arrives.
2486 */
2487 clearLagTimes = false;
2488 if (applyPtr == sentPtr)
2489 {
2491 clearLagTimes = true;
2492 fullyAppliedLastTime = true;
2493 }
2494 else
2495 fullyAppliedLastTime = false;
2496
2497 /* Send a reply if the standby requested one. */
2498 if (replyRequested)
2500
2501 /*
2502 * Update shared state for this WalSender process based on reply data from
2503 * standby.
2504 */
2505 {
2507
2508 SpinLockAcquire(&walsnd->mutex);
2509 walsnd->write = writePtr;
2510 walsnd->flush = flushPtr;
2511 walsnd->apply = applyPtr;
2512 if (writeLag != -1 || clearLagTimes)
2513 walsnd->writeLag = writeLag;
2514 if (flushLag != -1 || clearLagTimes)
2515 walsnd->flushLag = flushLag;
2516 if (applyLag != -1 || clearLagTimes)
2517 walsnd->applyLag = applyLag;
2518 walsnd->replyTime = replyTime;
2519 SpinLockRelease(&walsnd->mutex);
2520 }
2521
2524
2525 /*
2526 * Advance our local xmin horizon when the client confirmed a flush.
2527 */
2529 {
2532 else
2534 }
2535}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1811
#define SlotIsLogical(slot)
Definition slot.h:285
void SyncRepReleaseWaiters(void)
Definition syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:173
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2401
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4167
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4286

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 481 of file walsender.c.

482{
483#define READ_REPLICATION_SLOT_COLS 3
484 ReplicationSlot *slot;
487 TupleDesc tupdesc;
489 bool nulls[READ_REPLICATION_SLOT_COLS];
490
492 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
493 TEXTOID, -1, 0);
494 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
495 TEXTOID, -1, 0);
496 /* TimeLineID is unsigned, so int4 is not wide enough. */
497 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
498 INT8OID, -1, 0);
499
500 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
501
503 slot = SearchNamedReplicationSlot(cmd->slotname, false);
504 if (slot == NULL || !slot->in_use)
505 {
507 }
508 else
509 {
511 int i = 0;
512
513 /* Copy slot contents while holding spinlock */
514 SpinLockAcquire(&slot->mutex);
515 slot_contents = *slot;
516 SpinLockRelease(&slot->mutex);
518
519 if (OidIsValid(slot_contents.data.database))
522 errmsg("cannot use %s with a logical replication slot",
523 "READ_REPLICATION_SLOT"));
524
525 /* slot type */
526 values[i] = CStringGetTextDatum("physical");
527 nulls[i] = false;
528 i++;
529
530 /* start LSN */
531 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
532 {
533 char xloc[64];
534
535 snprintf(xloc, sizeof(xloc), "%X/%08X",
536 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
538 nulls[i] = false;
539 }
540 i++;
541
542 /* timeline this WAL was produced on */
543 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
544 {
548
549 /*
550 * While in recovery, use as timeline the currently-replaying one
551 * to get the LSN position's history.
552 */
553 if (RecoveryInProgress())
555 else
557
562 nulls[i] = false;
563 }
564 i++;
565
567 }
568
571 do_tup_output(tstate, values, nulls);
573}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:544
#define OidIsValid(objectId)
Definition c.h:788
@ LW_SHARED
Definition lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:540
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 580 of file walsender.c.

581{
583 TupleDesc tupdesc;
586 char path[MAXPGPATH];
587 int fd;
590 Size len;
591
593
594 /*
595 * Reply with a result set with one row, and two columns. The first col is
596 * the name of the history file, 2nd is the contents.
597 */
598 tupdesc = CreateTemplateTupleDesc(2);
599 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
600 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
601
603 TLHistoryFilePath(path, cmd->timeline);
604
605 /* Send a RowDescription message */
606 dest->rStartup(dest, CMD_SELECT, tupdesc);
607
608 /* Send a DataRow message */
610 pq_sendint16(&buf, 2); /* # of columns */
612 pq_sendint32(&buf, len); /* col1 len */
614
616 if (fd < 0)
619 errmsg("could not open file \"%s\": %m", path)));
620
621 /* Determine file length and send it to client */
623 if (histfilelen < 0)
626 errmsg("could not seek to end of file \"%s\": %m", path)));
627 if (lseek(fd, 0, SEEK_SET) != 0)
630 errmsg("could not seek to beginning of file \"%s\": %m", path)));
631
632 pq_sendint32(&buf, histfilelen); /* col2 len */
633
635 while (bytesleft > 0)
636 {
638 int nread;
639
641 nread = read(fd, rbuf.data, sizeof(rbuf));
643 if (nread < 0)
646 errmsg("could not read file \"%s\": %m",
647 path)));
648 else if (nread == 0)
651 errmsg("could not read file \"%s\": read %d of %zu",
652 path, nread, (Size) bytesleft)));
653
654 pq_sendbytes(&buf, rbuf.data, nread);
655 bytesleft -= nread;
656 }
657
658 if (CloseTransientFile(fd) != 0)
661 errmsg("could not close file \"%s\": %m", path)));
662
664}
#define PG_BINARY
Definition c.h:1287
size_t Size
Definition c.h:619
int errcode_for_file_access(void)
Definition elog.c:886
int CloseTransientFile(int fd)
Definition fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:275
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1457 of file walsender.c.

1458{
1460 QueryCompletion qc;
1461
1462 /* make sure that our requirements are still fulfilled */
1464
1466
1467 ReplicationSlotAcquire(cmd->slotname, true, true);
1468
1469 /*
1470 * Force a disconnect, so that the decoding code doesn't need to care
1471 * about an eventual switch from running in recovery, to running in a
1472 * normal environment. Client code is expected to handle reconnects.
1473 */
1475 {
1476 ereport(LOG,
1477 (errmsg("terminating walsender process after promotion")));
1478 got_STOPPING = true;
1479 }
1480
1481 /*
1482 * Create our decoding context, making it start at the previously ack'ed
1483 * position.
1484 *
1485 * Do this before sending a CopyBothResponse message, so that any errors
1486 * are reported early.
1487 */
1489 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1491 .segment_open = WalSndSegmentOpen,
1492 .segment_close = wal_segment_close),
1496
1498
1499 /* Send a CopyBothResponse message, and start streaming */
1501 pq_sendbyte(&buf, 0);
1502 pq_sendint16(&buf, 0);
1504 pq_flush();
1505
1506 /* Start reading WAL from the oldest required WAL. */
1509
1510 /*
1511 * Report the location after which we'll send out further commits as the
1512 * current sentPtr.
1513 */
1515
1516 /* Also update the sent position status in shared memory */
1520
1521 replication_active = true;
1522
1524
1525 /* Main loop of walsender */
1527
1530
1531 replication_active = false;
1532 if (got_STOPPING)
1533 proc_exit(0);
1535
1536 /* Get out of COPY mode (CommandComplete). */
1538 EndCommand(&qc, DestRemote, false);
1539}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:169
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:489
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:620
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:2879
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:216
static void XLogSendLogical(void)
Definition walsender.c:3501
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:232

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 812 of file walsender.c.

813{
817
818 /* create xlogreader for physical replication */
819 xlogreader =
821 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
822 .segment_close = wal_segment_close),
823 NULL);
824
825 if (!xlogreader)
828 errmsg("out of memory"),
829 errdetail("Failed while allocating a WAL reading processor.")));
830
831 /*
832 * We assume here that we're logging enough information in the WAL for
833 * log-shipping, since this is checked in PostmasterMain().
834 *
835 * NOTE: wal_level can only change at shutdown, so in most cases it is
836 * difficult for there to be WAL data that we can still see that was
837 * written at wal_level='minimal'.
838 */
839
840 if (cmd->slotname)
841 {
842 ReplicationSlotAcquire(cmd->slotname, true, true);
846 errmsg("cannot use a logical replication slot for physical replication")));
847
848 /*
849 * We don't need to verify the slot's restart_lsn here; instead we
850 * rely on the caller requesting the starting point to use. If the
851 * WAL segment doesn't exist, we'll fail later.
852 */
853 }
854
855 /*
856 * Select the timeline. If it was given explicitly by the client, use
857 * that. Otherwise use the timeline of the last replayed record.
858 */
862 else
864
865 if (cmd->timeline != 0)
866 {
868
869 sendTimeLine = cmd->timeline;
870 if (sendTimeLine == FlushTLI)
871 {
874 }
875 else
876 {
878
880
881 /*
882 * Check that the timeline the client requested exists, and the
883 * requested start location is on that timeline.
884 */
889
890 /*
891 * Found the requested timeline in the history. Check that
892 * requested startpoint is on that timeline in our history.
893 *
894 * This is quite loose on purpose. We only check that we didn't
895 * fork off the requested timeline before the switchpoint. We
896 * don't check that we switched *to* it before the requested
897 * starting point. This is because the client can legitimately
898 * request to start replication from the beginning of the WAL
899 * segment that contains switchpoint, but on the new timeline, so
900 * that it doesn't end up with a partial segment. If you ask for
901 * too old a starting point, you'll get an error later when we
902 * fail to find the requested WAL segment in pg_wal.
903 *
904 * XXX: we could be more strict here and only allow a startpoint
905 * that's older than the switchpoint, if it's still in the same
906 * WAL segment.
907 */
909 switchpoint < cmd->startpoint)
910 {
912 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
914 cmd->timeline),
915 errdetail("This server's history forked from timeline %u at %X/%08X.",
916 cmd->timeline,
918 }
920 }
921 }
922 else
923 {
927 }
928
930
931 /* If there is nothing to stream, don't even enter COPY mode */
933 {
934 /*
935 * When we first start replication the standby will be behind the
936 * primary. For some applications, for example synchronous
937 * replication, it is important to have a clear state for this initial
938 * catchup mode, so we can trigger actions when we change streaming
939 * state later. We may stay in this state for a long time, which is
940 * exactly why we want to be able to monitor whether or not we are
941 * still here.
942 */
944
945 /* Send a CopyBothResponse message, and start streaming */
947 pq_sendbyte(&buf, 0);
948 pq_sendint16(&buf, 0);
950 pq_flush();
951
952 /*
953 * Don't allow a request to stream from a future point in WAL that
954 * hasn't been flushed to disk in this server yet.
955 */
956 if (FlushPtr < cmd->startpoint)
957 {
959 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
962 }
963
964 /* Start streaming from the requested point */
965 sentPtr = cmd->startpoint;
966
967 /* Initialize shared memory status, too */
971
973
974 /* Main loop of walsender */
975 replication_active = true;
976
978
979 replication_active = false;
980 if (got_STOPPING)
981 proc_exit(0);
983
985 }
986
987 if (cmd->slotname)
989
990 /*
991 * Copy is finished now. Send a single-row result set indicating the next
992 * timeline.
993 */
995 {
996 char startpos_str[8 + 1 + 8 + 1];
999 TupleDesc tupdesc;
1000 Datum values[2];
1001 bool nulls[2] = {0};
1002
1003 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1005
1007
1008 /*
1009 * Need a tuple descriptor representing two columns. int8 may seem
1010 * like a surprising data type for this, but in theory int4 would not
1011 * be wide enough for this, as TimeLineID is unsigned.
1012 */
1013 tupdesc = CreateTemplateTupleDesc(2);
1014 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1015 INT8OID, -1, 0);
1016 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1017 TEXTOID, -1, 0);
1018
1019 /* prepare for projection of tuple */
1021
1024
1025 /* send it to dest */
1026 do_tup_output(tstate, values, nulls);
1027
1029 }
1030
1031 /* Send CommandComplete message */
1032 EndReplicationCommand("START_STREAMING");
1033}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:572
int errdetail(const char *fmt,...)
Definition elog.c:1216
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3191
int wal_segment_size
Definition xlog.c:146
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:107

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2588 of file walsender.c.

2589{
2591 TransactionId nextXid;
2593
2597
2598 if (xid <= nextXid)
2599 {
2600 if (epoch != nextEpoch)
2601 return false;
2602 }
2603 else
2604 {
2605 if (epoch + 1 != nextEpoch)
2606 return false;
2607 }
2608
2609 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2610 return false; /* epoch OK, but it's wrapped around */
2611
2612 return true;
2613}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 670 of file walsender.c.

671{
672 MemoryContext mcxt;
674 off_t offset = 0;
676
677 /*
678 * parsing the manifest will use the cryptohash stuff, which requires a
679 * resource owner
680 */
685
686 /* Prepare to read manifest data into a temporary context. */
688 "incremental backup information",
691
692 /* Send a CopyInResponse message */
694 pq_sendbyte(&buf, 0);
695 pq_sendint16(&buf, 0);
697 pq_flush();
698
699 /* Receive packets from client until done. */
700 while (HandleUploadManifestPacket(&buf, &offset, ib))
701 ;
702
703 /* Finish up manifest processing. */
705
706 /*
707 * Discard any old manifest information and arrange to preserve the new
708 * information we just got.
709 *
710 * We assume that MemoryContextDelete and MemoryContextSetParent won't
711 * fail, and thus we shouldn't end up bailing out of here in such a way as
712 * to leave dangling pointers.
713 */
719
720 /* clean up the resource owner we created */
722}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext CacheMemoryContext
Definition mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:736
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:156

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2852 of file walsender.c.

2853{
2855
2856 /* don't bail out if we're doing something that doesn't require timeouts */
2857 if (last_reply_timestamp <= 0)
2858 return;
2859
2862
2864 {
2865 /*
2866 * Since typically expiration of replication timeout means
2867 * communication problem, we don't send the error message to the
2868 * standby.
2869 */
2871 (errmsg("terminating walsender process due to replication timeout")));
2872
2874 }
2875}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:131

References COMMERROR, ereport, errmsg(), fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2808 of file walsender.c.

2809{
2810 long sleeptime = 10000; /* 10 s */
2811
2813 {
2815
2816 /*
2817 * At the latest stop sleeping once wal_sender_timeout has been
2818 * reached.
2819 */
2822
2823 /*
2824 * If no ping has been sent yet, wakeup when it's time to do so.
2825 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2826 * the timeout passed without a response.
2827 */
2830 wal_sender_timeout / 2);
2831
2832 /* Compute relative time until wakeup. */
2834 }
2835
2836 return sleeptime;
2837}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757

References fb(), last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3594 of file walsender.c.

3595{
3597
3598 /* ... let's just be real sure we're caught up ... */
3599 send_data();
3600
3601 /*
3602 * To figure out whether all WAL has successfully been replicated, check
3603 * flush location if valid, write otherwise. Tools like pg_receivewal will
3604 * usually (unless in synchronous mode) return an invalid flush location.
3605 */
3608
3611 {
3612 QueryCompletion qc;
3613
3614 /* Inform the standby that XLOG streaming is done */
3616 EndCommand(&qc, DestRemote, false);
3617 pq_flush();
3618
3619 proc_exit(0);
3620 }
3623}
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:202

References DestRemote, EndCommand(), fb(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition aio.c:1165
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1892
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:857
WALOpenSegment seg
Definition xlogreader.h:271
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:205
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3961 of file walsender.c.

3962{
3963 switch (state)
3964 {
3966 return "startup";
3967 case WALSNDSTATE_BACKUP:
3968 return "backup";
3970 return "catchup";
3972 return "streaming";
3974 return "stopping";
3975 }
3976 return "UNKNOWN";
3977}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3878 of file walsender.c.

3879{
3880 int i;
3881
3882 for (i = 0; i < max_wal_senders; i++)
3883 {
3885 pid_t pid;
3886
3887 SpinLockAcquire(&walsnd->mutex);
3888 pid = walsnd->pid;
3889 SpinLockRelease(&walsnd->mutex);
3890
3891 if (pid == 0)
3892 continue;
3893
3895 }
3896}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4190 of file walsender.c.

4191{
4193
4194 /*
4195 * Don't send keepalive messages if timeouts are globally disabled or
4196 * we're doing something not partaking in timeouts.
4197 */
4199 return;
4200
4202 return;
4203
4204 /*
4205 * If half of wal_sender_timeout has lapsed without receiving any reply
4206 * from the standby, send a keep-alive message to the standby requesting
4207 * an immediate reply.
4208 */
4210 wal_sender_timeout / 2);
4212 {
4214
4215 /* Try to flush pending output to the client */
4216 if (pq_flush_if_writable() != 0)
4218 }
4219}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3097 of file walsender.c.

3098{
3100
3101 Assert(walsnd != NULL);
3102
3103 MyWalSnd = NULL;
3104
3105 SpinLockAcquire(&walsnd->mutex);
3106 /* Mark WalSnd struct as no longer being in use. */
3107 walsnd->pid = 0;
3108 SpinLockRelease(&walsnd->mutex);
3109}

References Assert, fb(), MyWalSnd, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3715 of file walsender.c.

3716{
3717 got_SIGUSR2 = true;
3719}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2879 of file walsender.c.

2880{
2882
2883 /*
2884 * Initialize the last reply timestamp. That enables timeout processing
2885 * from hereon.
2886 */
2889
2890 /*
2891 * Loop until we reach the end of this timeline or the client requests to
2892 * stop streaming.
2893 */
2894 for (;;)
2895 {
2896 /* Clear any already-pending wakeups */
2898
2900
2901 /* Process any requests or signals received recently */
2903 {
2904 ConfigReloadPending = false;
2907 }
2908
2909 /* Check for input from the client */
2911
2912 /*
2913 * If we have received CopyDone from the client, sent CopyDone
2914 * ourselves, and the output buffer is empty, it's time to exit
2915 * streaming.
2916 */
2919 break;
2920
2921 /*
2922 * If we don't have any pending data in the output buffer, try to send
2923 * some more. If there is some, we don't bother to call send_data
2924 * again until we've flushed it ... but we'd better assume we are not
2925 * caught up.
2926 */
2927 if (!pq_is_send_pending())
2928 send_data();
2929 else
2930 WalSndCaughtUp = false;
2931
2932 /* Try to flush pending output to the client */
2933 if (pq_flush_if_writable() != 0)
2935
2936 /* If nothing remains to be sent right now ... */
2938 {
2939 /*
2940 * If we're in catchup state, move to streaming. This is an
2941 * important state change for users to know about, since before
2942 * this point data loss might occur if the primary dies and we
2943 * need to failover to the standby. The state change is also
2944 * important for synchronous replication, since commits that
2945 * started to wait at that point might wait for some time.
2946 */
2948 {
2950 (errmsg_internal("\"%s\" has now caught up with upstream server",
2953 }
2954
2955 /*
2956 * When SIGUSR2 arrives, we send any outstanding logs up to the
2957 * shutdown checkpoint record (i.e., the latest record), wait for
2958 * them to be replicated to the standby, and exit. This may be a
2959 * normal termination at shutdown, or a promotion, the walsender
2960 * is not sure which.
2961 */
2962 if (got_SIGUSR2)
2964 }
2965
2966 /* Check for replication timeout. */
2968
2969 /* Send keepalive if the time has come */
2971
2972 /*
2973 * Block if we have unsent data. XXX For logical replication, let
2974 * WalSndWaitForWal() handle any other blocking; idle receivers need
2975 * its additional actions. For physical replication, also block if
2976 * caught up; its send_data does not block.
2977 *
2978 * The IO statistics are reported in WalSndWaitForWal() for the
2979 * logical WAL senders.
2980 */
2984 {
2985 long sleeptime;
2986 int wakeEvents;
2988
2991 else
2992 wakeEvents = 0;
2993
2994 /*
2995 * Use fresh timestamp, not last_processing, to reduce the chance
2996 * of reaching wal_sender_timeout before sending a keepalive.
2997 */
3000
3001 if (pq_is_send_pending())
3003
3004 /* Report IO statistics, if needed */
3007 {
3008 pgstat_flush_io(false);
3010 last_flush = now;
3011 }
3012
3013 /* Sleep until something happens or we time out */
3015 }
3016 }
3017}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
char * application_name
Definition guc_tables.c:561
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:103
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3594

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1550 of file walsender.c.

1551{
1552 /* can't have sync rep confused by sending the same LSN several times */
1553 if (!last_write)
1554 lsn = InvalidXLogRecPtr;
1555
1556 resetStringInfo(ctx->out);
1557
1559 pq_sendint64(ctx->out, lsn); /* dataStart */
1560 pq_sendint64(ctx->out, lsn); /* walEnd */
1561
1562 /*
1563 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1564 * reserve space here.
1565 */
1566 pq_sendint64(ctx->out, 0); /* sendtime */
1567}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3670 of file walsender.c.

3671{
3672 int i;
3673
3674 for (i = 0; i < max_wal_senders; i++)
3675 {
3677
3678 SpinLockAcquire(&walsnd->mutex);
3679 if (walsnd->pid == 0)
3680 {
3681 SpinLockRelease(&walsnd->mutex);
3682 continue;
3683 }
3684 walsnd->needreload = true;
3685 SpinLockRelease(&walsnd->mutex);
3686 }
3687}

References fb(), i, max_wal_senders, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3113 of file walsender.c.

3115{
3116 char path[MAXPGPATH];
3117
3118 /*-------
3119 * When reading from a historic timeline, and there is a timeline switch
3120 * within this segment, read from the WAL segment belonging to the new
3121 * timeline.
3122 *
3123 * For example, imagine that this server is currently on timeline 5, and
3124 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3125 * 0/13002088. In pg_wal, we have these files:
3126 *
3127 * ...
3128 * 000000040000000000000012
3129 * 000000040000000000000013
3130 * 000000050000000000000013
3131 * 000000050000000000000014
3132 * ...
3133 *
3134 * In this situation, when requested to send the WAL from segment 0x13, on
3135 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3136 * recovery prefers files from newer timelines, so if the segment was
3137 * restored from the archive on this server, the file belonging to the old
3138 * timeline, 000000040000000000000013, might not exist. Their contents are
3139 * equal up to the switchpoint, because at a timeline switch, the used
3140 * portion of the old segment is copied to the new file.
3141 */
3144 {
3146
3147 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3148 if (nextSegNo == endSegNo)
3150 }
3151
3152 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3153 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3154 if (state->seg.ws_file >= 0)
3155 return;
3156
3157 /*
3158 * If the file is not found, assume it's because the standby asked for a
3159 * too old WAL segment that has already been removed or recycled.
3160 */
3161 if (errno == ENOENT)
3162 {
3163 char xlogfname[MAXFNAMELEN];
3164 int save_errno = errno;
3165
3167 errno = save_errno;
3168 ereport(ERROR,
3170 errmsg("requested WAL segment %s has already been removed",
3171 xlogfname)));
3172 }
3173 else
3174 ereport(ERROR,
3176 errmsg("could not open file \"%s\": %m",
3177 path)));
3178}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1086
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3942 of file walsender.c.

3943{
3945
3947
3948 if (walsnd->state == state)
3949 return;
3950
3951 SpinLockAcquire(&walsnd->mutex);
3952 walsnd->state = state;
3953 SpinLockRelease(&walsnd->mutex);
3954}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire, and SpinLockRelease.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3754 of file walsender.c.

3755{
3756 bool found;
3757 int i;
3758
3760 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3761
3762 if (!found)
3763 {
3764 /* First time through, so initialize */
3766
3767 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3769
3770 for (i = 0; i < max_wal_senders; i++)
3771 {
3773
3774 SpinLockInit(&walsnd->mutex);
3775 }
3776
3780 }
3781}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
#define SpinLockInit(lock)
Definition spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition walsender.c:3742

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, MemSet, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3742 of file walsender.c.

3743{
3744 Size size = 0;
3745
3746 size = offsetof(WalSndCtlData, walsnds);
3747 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3748
3749 return size;
3750}
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510

References add_size(), fb(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 384 of file walsender.c.

385{
386 /*
387 * Reset whereToSendOutput to prevent ereport from attempting to send any
388 * more messages to the standby.
389 */
392
393 proc_exit(0);
394}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:92

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3723 of file walsender.c.

3724{
3725 /* Set up signal handlers */
3727 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3728 pqsignal(SIGTERM, die); /* request shutdown */
3729 /* SIGQUIT handler was already set up by InitPostmasterChild */
3730 InitializeTimeouts(); /* establishes SIGALRM handler */
3733 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3734 * shutdown */
3735
3736 /* Reset some signals that are accepted by postmaster but not here */
3738}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:547
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3062
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:677
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3715
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1673 of file walsender.c.

1675{
1676 static TimestampTz sendTime = 0;
1678 bool pending_writes = false;
1679 bool end_xact = ctx->end_xact;
1680
1681 /*
1682 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1683 * avoid flooding the lag tracker when we commit frequently.
1684 *
1685 * We don't have a mechanism to get the ack for any LSN other than end
1686 * xact LSN from the downstream. So, we track lag only for end of
1687 * transaction LSN.
1688 */
1689#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1690 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1692 {
1693 LagTrackerWrite(lsn, now);
1694 sendTime = now;
1695 }
1696
1697 /*
1698 * When skipping empty transactions in synchronous replication, we send a
1699 * keepalive message to avoid delaying such transactions.
1700 *
1701 * It is okay to check sync_standbys_status without lock here as in the
1702 * worst case we will just send an extra keepalive message when it is
1703 * really not required.
1704 */
1705 if (skipped_xact &&
1706 SyncRepRequested() &&
1707 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1708 {
1709 WalSndKeepalive(false, lsn);
1710
1711 /* Try to flush pending output to the client */
1712 if (pq_flush_if_writable() != 0)
1714
1715 /* If we have pending write here, make sure it's actually flushed */
1716 if (pq_is_send_pending())
1717 pending_writes = true;
1718 }
1719
1720 /*
1721 * Process pending writes if any or try to send a keepalive if required.
1722 * We don't need to try sending keep alive messages at the transaction end
1723 * as that will be done at a later point in time. This is required only
1724 * for large transactions where we don't send any changes to the
1725 * downstream and the receiver can timeout due to that.
1726 */
1727 if (pending_writes || (!end_xact &&
1729 wal_sender_timeout / 2)))
1731}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1619
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4228
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3820 of file walsender.c.

3821{
3822 WaitEvent event;
3823
3825
3826 /*
3827 * We use a condition variable to efficiently wake up walsenders in
3828 * WalSndWakeup().
3829 *
3830 * Every walsender prepares to sleep on a shared memory CV. Note that it
3831 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3832 * waitlist), but does not actually wait on the CV (IOW, it never calls
3833 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3834 * waiting, because we also need to wait for socket events. The processes
3835 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3836 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3837 * walsenders come out of WaitEventSetWait().
3838 *
3839 * This approach is simple and efficient because, one doesn't have to loop
3840 * through all the walsenders slots, with a spinlock acquisition and
3841 * release for every iteration, just to wake up only the waiting
3842 * walsenders. It makes WalSndWakeup() callers' life easy.
3843 *
3844 * XXX: A desirable future improvement would be to add support for CVs
3845 * into WaitEventSetWait().
3846 *
3847 * And, we use separate shared memory CVs for physical and logical
3848 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3849 *
3850 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3851 * until awakened by physical walsenders after the walreceiver confirms
3852 * the receipt of the LSN.
3853 */
3860
3861 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3862 (event.events & WL_POSTMASTER_DEATH))
3863 {
3865 proc_exit(1);
3866 }
3867
3869}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:63
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:166
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1823 of file walsender.c.

1824{
1825 int wakeEvents;
1826 uint32 wait_event = 0;
1829
1830 /*
1831 * Fast path to avoid acquiring the spinlock in case we already know we
1832 * have enough WAL available and all the standby servers have confirmed
1833 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1834 * if we're far behind.
1835 */
1838 return RecentFlushPtr;
1839
1840 /*
1841 * Within the loop, we wait for the necessary WALs to be flushed to disk
1842 * first, followed by waiting for standbys to catch up if there are enough
1843 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1844 */
1845 for (;;)
1846 {
1847 bool wait_for_standby_at_stop = false;
1848 long sleeptime;
1850
1851 /* Clear any already-pending wakeups */
1853
1855
1856 /* Process any requests or signals received recently */
1858 {
1859 ConfigReloadPending = false;
1862 }
1863
1864 /* Check for input from the client */
1866
1867 /*
1868 * If we're shutting down, trigger pending WAL to be written out,
1869 * otherwise we'd possibly end up waiting for WAL that never gets
1870 * written, because walwriter has shut down already.
1871 */
1872 if (got_STOPPING)
1874
1875 /*
1876 * To avoid the scenario where standbys need to catch up to a newer
1877 * WAL location in each iteration, we update our idea of the currently
1878 * flushed position only if we are not waiting for standbys to catch
1879 * up.
1880 */
1882 {
1883 if (!RecoveryInProgress())
1885 else
1887 }
1888
1889 /*
1890 * If postmaster asked us to stop and the standby slots have caught up
1891 * to the flushed position, don't wait anymore.
1892 *
1893 * It's important to do this check after the recomputation of
1894 * RecentFlushPtr, so we can send all remaining data before shutting
1895 * down.
1896 */
1897 if (got_STOPPING)
1898 {
1901 else
1902 break;
1903 }
1904
1905 /*
1906 * We only send regular messages to the client for full decoded
1907 * transactions, but a synchronous replication and walsender shutdown
1908 * possibly are waiting for a later location. So, before sleeping, we
1909 * send a ping containing the flush location. If the receiver is
1910 * otherwise idle, this keepalive will trigger a reply. Processing the
1911 * reply will update these MyWalSnd locations.
1912 */
1913 if (MyWalSnd->flush < sentPtr &&
1914 MyWalSnd->write < sentPtr &&
1917
1918 /*
1919 * Exit the loop if already caught up and doesn't need to wait for
1920 * standby slots.
1921 */
1924 break;
1925
1926 /*
1927 * Waiting for new WAL or waiting for standbys to catch up. Since we
1928 * need to wait, we're now caught up.
1929 */
1930 WalSndCaughtUp = true;
1931
1932 /*
1933 * Try to flush any pending output to the client.
1934 */
1935 if (pq_flush_if_writable() != 0)
1937
1938 /*
1939 * If we have received CopyDone from the client, sent CopyDone
1940 * ourselves, and the output buffer is empty, it's time to exit
1941 * streaming, so fail the current WAL fetch request.
1942 */
1945 break;
1946
1947 /* die if timeout was reached */
1949
1950 /* Send keepalive if the time has come */
1952
1953 /*
1954 * Sleep until something happens or we time out. Also wait for the
1955 * socket becoming writable, if there's still pending output.
1956 * Otherwise we might sit on sendable output data while waiting for
1957 * new WAL to be generated. (But if we have nothing to send, we don't
1958 * want to wake on socket-writable.)
1959 */
1962
1964
1965 if (pq_is_send_pending())
1967
1968 Assert(wait_event != 0);
1969
1970 /* Report IO statistics, if needed */
1973 {
1974 pgstat_flush_io(false);
1976 last_flush = now;
1977 }
1978
1980 }
1981
1982 /* reactivate latch so WalSndLoop knows to continue */
1984 return RecentFlushPtr;
1985}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1795
bool XLogBackgroundFlush(void)
Definition xlog.c:2988

References Assert, CHECK_FOR_INTERRUPTS, ConfigReloadPending, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3904 of file walsender.c.

3905{
3906 for (;;)
3907 {
3908 int i;
3909 bool all_stopped = true;
3910
3911 for (i = 0; i < max_wal_senders; i++)
3912 {
3914
3915 SpinLockAcquire(&walsnd->mutex);
3916
3917 if (walsnd->pid == 0)
3918 {
3919 SpinLockRelease(&walsnd->mutex);
3920 continue;
3921 }
3922
3923 if (walsnd->state != WALSNDSTATE_STOPPING)
3924 {
3925 all_stopped = false;
3926 SpinLockRelease(&walsnd->mutex);
3927 break;
3928 }
3929 SpinLockRelease(&walsnd->mutex);
3930 }
3931
3932 /* safe to leave if confirmation is done for all WAL senders */
3933 if (all_stopped)
3934 return;
3935
3936 pg_usleep(10000L); /* wait for 10 msec */
3937 }
3938}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire, SpinLockRelease, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3799 of file walsender.c.

3800{
3801 /*
3802 * Wake up all the walsenders waiting on WAL being flushed or replayed
3803 * respectively. Note that waiting walsender would have prepared to sleep
3804 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3805 * before actually waiting.
3806 */
3807 if (physical)
3809
3810 if (logical)
3812}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1577 of file walsender.c.

1579{
1581
1582 /*
1583 * Fill the send timestamp last, so that it is taken as late as possible.
1584 * This is somewhat ugly, but the protocol is set as it's already used for
1585 * several releases by streaming physical replication.
1586 */
1590 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1591 tmpbuf.data, sizeof(int64));
1592
1593 /* output previously gathered data in a CopyData packet */
1595
1597
1598 /* Try to flush pending output to the client */
1599 if (pq_flush_if_writable() != 0)
1601
1602 /* Try taking fast path unless we get too close to walsender timeout. */
1604 wal_sender_timeout / 2) &&
1606 {
1607 return;
1608 }
1609
1610 /* If we have pending write here, go to slow path */
1612}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, fb(), GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3501 of file walsender.c.

3502{
3503 XLogRecord *record;
3504 char *errm;
3505
3506 /*
3507 * We'll use the current flush point to determine whether we've caught up.
3508 * This variable is static in order to cache it across calls. Caching is
3509 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3510 * spinlock.
3511 */
3513
3514 /*
3515 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3516 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3517 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3518 * didn't wait - i.e. when we're shutting down.
3519 */
3520 WalSndCaughtUp = false;
3521
3523
3524 /* xlog record was invalid */
3525 if (errm != NULL)
3526 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3527 errm);
3528
3529 if (record != NULL)
3530 {
3531 /*
3532 * Note the lack of any call to LagTrackerWrite() which is handled by
3533 * WalSndUpdateProgress which is called by output plugin through
3534 * logical decoding write api.
3535 */
3537
3539 }
3540
3541 /*
3542 * If first time through in this session, initialize flushPtr. Otherwise,
3543 * we only need to update flushPtr if EndRecPtr is past it.
3544 */
3547 {
3548 /*
3549 * For cascading logical WAL senders, we use the replay LSN instead of
3550 * the flush LSN, since logical decoding on a standby only processes
3551 * WAL that has been replayed. This distinction becomes particularly
3552 * important during shutdown, as new WAL is no longer replayed and the
3553 * last replayed LSN marks the furthest point up to which decoding can
3554 * proceed.
3555 */
3558 else
3560 }
3561
3562 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3564 WalSndCaughtUp = true;
3565
3566 /*
3567 * If we're caught up and have been requested to stop, have WalSndLoop()
3568 * terminate the connection in an orderly manner, after writing out all
3569 * the pending data.
3570 */
3572 got_SIGUSR2 = true;
3573
3574 /* Update shared memory status */
3575 {
3577
3578 SpinLockAcquire(&walsnd->mutex);
3579 walsnd->sentPtr = sentPtr;
3580 SpinLockRelease(&walsnd->mutex);
3581 }
3582}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:88
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3191 of file walsender.c.

3192{
3194 XLogRecPtr startptr;
3195 XLogRecPtr endptr;
3196 Size nbytes;
3197 XLogSegNo segno;
3199 Size rbytes;
3200
3201 /* If requested switch the WAL sender to the stopping state. */
3202 if (got_STOPPING)
3204
3206 {
3207 WalSndCaughtUp = true;
3208 return;
3209 }
3210
3211 /* Figure out how far we can safely send the WAL. */
3213 {
3214 /*
3215 * Streaming an old timeline that's in this server's history, but is
3216 * not the one we're currently inserting or replaying. It can be
3217 * streamed up to the point where we switched off that timeline.
3218 */
3220 }
3221 else if (am_cascading_walsender)
3222 {
3224
3225 /*
3226 * Streaming the latest timeline on a standby.
3227 *
3228 * Attempt to send all WAL that has already been replayed, so that we
3229 * know it's valid. If we're receiving WAL through streaming
3230 * replication, it's also OK to send any WAL that has been received
3231 * but not replayed.
3232 *
3233 * The timeline we're recovering from can change, or we can be
3234 * promoted. In either case, the current timeline becomes historic. We
3235 * need to detect that so that we don't try to stream past the point
3236 * where we switched to another timeline. We check for promotion or
3237 * timeline switch after calculating FlushPtr, to avoid a race
3238 * condition: if the timeline becomes historic just after we checked
3239 * that it was still current, it's still be OK to stream it up to the
3240 * FlushPtr that was calculated before it became historic.
3241 */
3242 bool becameHistoric = false;
3243
3245
3246 if (!RecoveryInProgress())
3247 {
3248 /* We have been promoted. */
3250 am_cascading_walsender = false;
3251 becameHistoric = true;
3252 }
3253 else
3254 {
3255 /*
3256 * Still a cascading standby. But is the timeline we're sending
3257 * still the one recovery is recovering from?
3258 */
3260 becameHistoric = true;
3261 }
3262
3263 if (becameHistoric)
3264 {
3265 /*
3266 * The timeline we were sending has become historic. Read the
3267 * timeline history file of the new timeline to see where exactly
3268 * we forked off from the timeline we were sending.
3269 */
3270 List *history;
3271
3274
3277
3279
3281 }
3282 }
3283 else
3284 {
3285 /*
3286 * Streaming the current timeline on a primary.
3287 *
3288 * Attempt to send all data that's already been written out and
3289 * fsync'd to disk. We cannot go further than what's been written out
3290 * given the current implementation of WALRead(). And in any case
3291 * it's unsafe to send WAL that is not securely down to disk on the
3292 * primary: if the primary subsequently crashes and restarts, standbys
3293 * must not have applied any WAL that got lost on the primary.
3294 */
3296 }
3297
3298 /*
3299 * Record the current system time as an approximation of the time at which
3300 * this WAL location was written for the purposes of lag tracking.
3301 *
3302 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3303 * is flushed and we could get that time as well as the LSN when we call
3304 * GetFlushRecPtr() above (and likewise for the cascading standby
3305 * equivalent), but rather than putting any new code into the hot WAL path
3306 * it seems good enough to capture the time here. We should reach this
3307 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3308 * may take some time, we read the WAL flush pointer and take the time
3309 * very close to together here so that we'll get a later position if it is
3310 * still moving.
3311 *
3312 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3313 * this gives us a cheap approximation for the WAL flush time for this
3314 * LSN.
3315 *
3316 * Note that the LSN is not necessarily the LSN for the data contained in
3317 * the present message; it's the end of the WAL, which might be further
3318 * ahead. All the lag tracking machinery cares about is finding out when
3319 * that arbitrary LSN is eventually reported as written, flushed and
3320 * applied, so that it can measure the elapsed time.
3321 */
3323
3324 /*
3325 * If this is a historic timeline and we've reached the point where we
3326 * forked to the next timeline, stop streaming.
3327 *
3328 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3329 * startup process will normally replay all WAL that has been received
3330 * from the primary, before promoting, but if the WAL streaming is
3331 * terminated at a WAL page boundary, the valid portion of the timeline
3332 * might end in the middle of a WAL record. We might've already sent the
3333 * first half of that partial WAL record to the cascading standby, so that
3334 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3335 * replay the partial WAL record either, so it can still follow our
3336 * timeline switch.
3337 */
3339 {
3340 /* close the current file. */
3341 if (xlogreader->seg.ws_file >= 0)
3343
3344 /* Send CopyDone */
3346 streamingDoneSending = true;
3347
3348 WalSndCaughtUp = true;
3349
3350 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3353 return;
3354 }
3355
3356 /* Do we have any work to do? */
3358 if (SendRqstPtr <= sentPtr)
3359 {
3360 WalSndCaughtUp = true;
3361 return;
3362 }
3363
3364 /*
3365 * Figure out how much to send in one message. If there's no more than
3366 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3367 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3368 *
3369 * The rounding is not only for performance reasons. Walreceiver relies on
3370 * the fact that we never split a WAL record across two messages. Since a
3371 * long WAL record is split at page boundary into continuation records,
3372 * page boundary is always a safe cut-off point. We also assume that
3373 * SendRqstPtr never points to the middle of a WAL record.
3374 */
3375 startptr = sentPtr;
3376 endptr = startptr;
3377 endptr += MAX_SEND_SIZE;
3378
3379 /* if we went beyond SendRqstPtr, back off */
3380 if (SendRqstPtr <= endptr)
3381 {
3382 endptr = SendRqstPtr;
3384 WalSndCaughtUp = false;
3385 else
3386 WalSndCaughtUp = true;
3387 }
3388 else
3389 {
3390 /* round down to page boundary. */
3391 endptr -= (endptr % XLOG_BLCKSZ);
3392 WalSndCaughtUp = false;
3393 }
3394
3395 nbytes = endptr - startptr;
3396 Assert(nbytes <= MAX_SEND_SIZE);
3397
3398 /*
3399 * OK to read and send the slice.
3400 */
3403
3404 pq_sendint64(&output_message, startptr); /* dataStart */
3405 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3406 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3407
3408 /*
3409 * Read the log directly into the output buffer to avoid extra memcpy
3410 * calls.
3411 */
3413
3414retry:
3415 /* attempt to read WAL from WAL buffers first */
3417 startptr, nbytes, xlogreader->seg.ws_tli);
3419 startptr += rbytes;
3420 nbytes -= rbytes;
3421
3422 /* now read the remaining WAL from WAL file */
3423 if (nbytes > 0 &&
3426 startptr,
3427 nbytes,
3428 xlogreader->seg.ws_tli, /* Pass the current TLI because
3429 * only WalSndSegmentOpen controls
3430 * whether new TLI is needed. */
3431 &errinfo))
3433
3434 /* See logical_read_xlog_page(). */
3435 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3437
3438 /*
3439 * During recovery, the currently-open WAL file might be replaced with the
3440 * file of the same name retrieved from archive. So we always need to
3441 * check what we read was valid after reading into the buffer. If it's
3442 * invalid, we try to open and read the file again.
3443 */
3445 {
3447 bool reload;
3448
3449 SpinLockAcquire(&walsnd->mutex);
3450 reload = walsnd->needreload;
3451 walsnd->needreload = false;
3452 SpinLockRelease(&walsnd->mutex);
3453
3454 if (reload && xlogreader->seg.ws_file >= 0)
3455 {
3457
3458 goto retry;
3459 }
3460 }
3461
3462 output_message.len += nbytes;
3464
3465 /*
3466 * Fill the send timestamp last, so that it is taken as late as possible.
3467 */
3470 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3471 tmpbuf.data, sizeof(int64));
3472
3474
3475 sentPtr = endptr;
3476
3477 /* Update shared memory status */
3478 {
3480
3481 SpinLockAcquire(&walsnd->mutex);
3482 walsnd->sentPtr = sentPtr;
3483 SpinLockRelease(&walsnd->mutex);
3484 }
3485
3486 /* Report progress of XLOG streaming in PS display */
3488 {
3489 char activitymsg[50];
3490
3491 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3494 }
3495}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:114
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1754

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 252 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 216 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 199 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 155 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 156 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader