PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void WalSndHandleConfigReload (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 226 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 114 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 103 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 258 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1415 of file walsender.c.

1416{
1417 bool failover_given = false;
1418 bool two_phase_given = false;
1419 bool failover;
1420 bool two_phase;
1421
1422 /* Parse options */
1424 {
1425 if (strcmp(defel->defname, "failover") == 0)
1426 {
1427 if (failover_given)
1428 ereport(ERROR,
1430 errmsg("conflicting or redundant options")));
1431 failover_given = true;
1433 }
1434 else if (strcmp(defel->defname, "two_phase") == 0)
1435 {
1436 if (two_phase_given)
1437 ereport(ERROR,
1439 errmsg("conflicting or redundant options")));
1440 two_phase_given = true;
1442 }
1443 else
1444 elog(ERROR, "unrecognized option: %s", defel->defname);
1445 }
1446
1450}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:949

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1194 of file walsender.c.

1195{
1196 const char *snapshot_name = NULL;
1197 char xloc[MAXFNAMELEN];
1198 char *slot_name;
1199 bool reserve_wal = false;
1200 bool two_phase = false;
1201 bool failover = false;
1205 TupleDesc tupdesc;
1206 Datum values[4];
1207 bool nulls[4] = {0};
1208
1210
1212 &failover);
1213
1214 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1215 {
1216 ReplicationSlotCreate(cmd->slotname, false,
1218 false, false, false);
1219
1220 if (reserve_wal)
1221 {
1223
1225
1226 /* Write this slot to disk if it's a permanent one. */
1227 if (!cmd->temporary)
1229 }
1230 }
1231 else
1232 {
1234 bool need_full_snapshot = false;
1235
1237
1239
1240 /*
1241 * Initially create persistent slot as ephemeral - that allows us to
1242 * nicely handle errors during initialization because it'll get
1243 * dropped if this transaction fails. We'll make it persistent at the
1244 * end. Temporary slots can be created as temporary from beginning as
1245 * they get dropped on error as well.
1246 */
1250
1251 /*
1252 * Do options check early so that we can bail before calling the
1253 * DecodingContextFindStartpoint which can take long time.
1254 */
1256 {
1257 if (IsTransactionBlock())
1258 ereport(ERROR,
1259 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1260 (errmsg("%s must not be called inside a transaction",
1261 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1262
1263 need_full_snapshot = true;
1264 }
1266 {
1267 if (!IsTransactionBlock())
1268 ereport(ERROR,
1269 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1270 (errmsg("%s must be called inside a transaction",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1272
1274 ereport(ERROR,
1275 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1276 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1277 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1278 if (!XactReadOnly)
1279 ereport(ERROR,
1280 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1281 (errmsg("%s must be called in a read-only transaction",
1282 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1283
1284 if (FirstSnapshotSet)
1285 ereport(ERROR,
1286 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1287 (errmsg("%s must be called before any query",
1288 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1289
1290 if (IsSubTransaction())
1291 ereport(ERROR,
1292 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1293 (errmsg("%s must not be called in a subtransaction",
1294 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1295
1296 need_full_snapshot = true;
1297 }
1298
1299 /*
1300 * Ensure the logical decoding is enabled before initializing the
1301 * logical decoding context.
1302 */
1305
1309 .segment_open = WalSndSegmentOpen,
1310 .segment_close = wal_segment_close),
1313
1314 /*
1315 * Signal that we don't need the timeout mechanism. We're just
1316 * creating the replication slot and don't yet accept feedback
1317 * messages or send keepalives. As we possibly need to wait for
1318 * further WAL the walsender would otherwise possibly be killed too
1319 * soon.
1320 */
1322
1323 /* build initial snapshot, might take a while */
1325
1326 /*
1327 * Export or use the snapshot if we've been asked to do so.
1328 *
1329 * NB. We will convert the snapbuild.c kind of snapshot to normal
1330 * snapshot when doing this.
1331 */
1333 {
1335 }
1337 {
1338 Snapshot snap;
1339
1342 }
1343
1344 /* don't need the decoding context anymore */
1346
1347 if (!cmd->temporary)
1349 }
1350
1351 snprintf(xloc, sizeof(xloc), "%X/%08X",
1353
1355
1356 /*----------
1357 * Need a tuple descriptor representing four columns:
1358 * - first field: the slot name
1359 * - second field: LSN at which we became consistent
1360 * - third field: exported snapshot's name
1361 * - fourth field: output plugin
1362 */
1363 tupdesc = CreateTemplateTupleDesc(4);
1364 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1365 TEXTOID, -1, 0);
1366 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1367 TEXTOID, -1, 0);
1368 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1369 TEXTOID, -1, 0);
1370 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1371 TEXTOID, -1, 0);
1372
1373 /* prepare for projection of tuples */
1375
1376 /* slot_name */
1377 slot_name = NameStr(MyReplicationSlot->data.name);
1378 values[0] = CStringGetTextDatum(slot_name);
1379
1380 /* consistent wal location */
1382
1383 /* snapshot name, or NULL if none */
1384 if (snapshot_name != NULL)
1386 else
1387 nulls[2] = true;
1388
1389 /* plugin, or NULL if none */
1390 if (cmd->plugin != NULL)
1392 else
1393 nulls[3] = true;
1394
1395 /* send it to dest */
1396 do_tup_output(tstate, values, nulls);
1398
1400}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define NameStr(name)
Definition c.h:765
#define Assert(condition)
Definition c.h:873
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:668
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:624
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:321
void CheckLogicalDecodingRequirements(void)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:204
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:305
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:378
void ReplicationSlotMarkDirty(void)
Definition slot.c:1173
void ReplicationSlotReserveWal(void)
Definition slot.c:1693
void ReplicationSlotPersist(void)
Definition slot.c:1190
ReplicationSlot * MyReplicationSlot
Definition slot.c:148
void ReplicationSlotSave(void)
Definition slot.c:1155
void ReplicationSlotRelease(void)
Definition slot.c:758
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:538
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:67
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:210
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:901
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1117
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3124
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1577
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1694
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1044
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1550
static TimestampTz last_reply_timestamp
Definition walsender.c:187
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:83
int XactIsoLevel
Definition xact.c:80
bool IsSubTransaction(void)
Definition xact.c:5066
bool IsTransactionBlock(void)
Definition xact.c:4993
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg(), ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1406 of file walsender.c.

1407{
1408 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1409}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:909

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 2010 of file walsender.c.

2011{
2012 yyscan_t scanner;
2013 int parse_rc;
2014 Node *cmd_node;
2015 const char *cmdtag;
2017
2018 /* We save and re-use the cmd_context across calls */
2020
2021 /*
2022 * If WAL sender has been told that shutdown is getting close, switch its
2023 * status accordingly to handle the next replication commands correctly.
2024 */
2025 if (got_STOPPING)
2027
2028 /*
2029 * Throw error if in stopping mode. We need prevent commands that could
2030 * generate WAL while the shutdown checkpoint is being written. To be
2031 * safe, we just prohibit all new commands.
2032 */
2034 ereport(ERROR,
2036 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2037
2038 /*
2039 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2040 * command arrives. Clean up the old stuff if there's anything.
2041 */
2043
2045
2046 /*
2047 * Prepare to parse and execute the command.
2048 *
2049 * Because replication command execution can involve beginning or ending
2050 * transactions, we need a working context that will survive that, so we
2051 * make it a child of TopMemoryContext. That in turn creates a hazard of
2052 * long-lived memory leaks if we lose track of the working context. We
2053 * deal with that by creating it only once per walsender, and resetting it
2054 * for each new command. (Normally this reset is a no-op, but if the
2055 * prior exec_replication_command call failed with an error, it won't be.)
2056 *
2057 * This is subtler than it looks. The transactions we manage can extend
2058 * across replication commands, indeed SnapBuildClearExportedSnapshot
2059 * might have just ended one. Because transaction exit will revert to the
2060 * memory context that was current at transaction start, we need to be
2061 * sure that that context is still valid. That motivates re-using the
2062 * same cmd_context rather than making a new one each time.
2063 */
2064 if (cmd_context == NULL)
2066 "Replication command context",
2068 else
2070
2072
2074
2075 /*
2076 * Is it a WalSender command?
2077 */
2079 {
2080 /* Nope; clean up and get out. */
2082
2085
2086 /* XXX this is a pretty random place to make this check */
2087 if (MyDatabaseId == InvalidOid)
2088 ereport(ERROR,
2090 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2091
2092 /* Tell the caller that this wasn't a WalSender command. */
2093 return false;
2094 }
2095
2096 /*
2097 * Looks like a WalSender command, so parse it.
2098 */
2100 if (parse_rc != 0)
2101 ereport(ERROR,
2103 errmsg_internal("replication command parser returned %d",
2104 parse_rc)));
2106
2107 /*
2108 * Report query to various monitoring facilities. For this purpose, we
2109 * report replication commands just like SQL commands.
2110 */
2112
2114
2115 /*
2116 * Log replication command if log_replication_commands is enabled. Even
2117 * when it's disabled, log the command with DEBUG1 level for backward
2118 * compatibility.
2119 */
2121 (errmsg("received replication command: %s", cmd_string)));
2122
2123 /*
2124 * Disallow replication commands in aborted transaction blocks.
2125 */
2127 ereport(ERROR,
2129 errmsg("current transaction is aborted, "
2130 "commands ignored until end of transaction block")));
2131
2133
2134 /*
2135 * Allocate buffers that will be used for each outgoing and incoming
2136 * message. We do this just once per command to reduce palloc overhead.
2137 */
2141
2142 switch (cmd_node->type)
2143 {
2145 cmdtag = "IDENTIFY_SYSTEM";
2149 break;
2150
2152 cmdtag = "READ_REPLICATION_SLOT";
2156 break;
2157
2158 case T_BaseBackupCmd:
2159 cmdtag = "BASE_BACKUP";
2164 break;
2165
2167 cmdtag = "CREATE_REPLICATION_SLOT";
2171 break;
2172
2174 cmdtag = "DROP_REPLICATION_SLOT";
2178 break;
2179
2181 cmdtag = "ALTER_REPLICATION_SLOT";
2185 break;
2186
2188 {
2190
2191 cmdtag = "START_REPLICATION";
2194
2195 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2196 StartReplication(cmd);
2197 else
2199
2200 /* dupe, but necessary per libpqrcv_endstreaming */
2202
2204 break;
2205 }
2206
2208 cmdtag = "TIMELINE_HISTORY";
2213 break;
2214
2215 case T_VariableShowStmt:
2216 {
2219
2220 cmdtag = "SHOW";
2222
2223 /* syscache access needs a transaction environment */
2225 GetPGVariable(n->name, dest);
2228 }
2229 break;
2230
2232 cmdtag = "UPLOAD_MANIFEST";
2237 break;
2238
2239 default:
2240 elog(ERROR, "unrecognized replication command node tag: %u",
2241 cmd_node->type);
2242 }
2243
2244 /*
2245 * Done. Revert to caller's memory context, and clean out the cmd_context
2246 * to recover memory right away.
2247 */
2250
2251 /*
2252 * We need not update ps display or pg_stat_activity, because PostgresMain
2253 * will reset those to "idle". But we must reset debug_query_string to
2254 * ensure it doesn't become a dangling pointer.
2255 */
2257
2258 return true;
2259}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:990
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:205
int errmsg_internal(const char *fmt,...)
Definition elog.c:1170
#define LOG
Definition elog.h:31
#define DEBUG1
Definition elog.h:30
Oid MyDatabaseId
Definition globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:408
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const char * debug_query_string
Definition postgres.c:89
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:599
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1415
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:580
WalSnd * MyWalSnd
Definition walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:481
static StringInfoData tmpbuf
Definition walsender.c:178
static void IdentifySystem(void)
Definition walsender.c:400
static StringInfoData reply_message
Definition walsender.c:177
void WalSndSetState(WalSndState state)
Definition walsender.c:3953
static StringInfoData output_message
Definition walsender.c:176
static void UploadManifest(void)
Definition walsender.c:670
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:206
bool log_replication_commands
Definition walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1194
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1457
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1406
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:812
static XLogReaderState * xlogreader
Definition walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3669
void StartTransactionCommand(void)
Definition xact.c:3080
bool IsAbortedTransactionBlockState(void)
Definition xact.c:408
void CommitTransactionCommand(void)
Definition xact.c:3178

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3648 of file walsender.c.

3649{
3651 TimeLineID replayTLI;
3654 XLogRecPtr result;
3655
3657
3658 /*
3659 * We can safely send what's already been replayed. Also, if walreceiver
3660 * is streaming WAL from the same timeline, we can send anything that it
3661 * has streamed, but hasn't been replayed yet.
3662 */
3663
3665 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3666
3667 if (tli)
3668 *tli = replayTLI;
3669
3670 result = replayPtr;
3671 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3672 result = receivePtr;
3673
3674 return result;
3675}
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1881
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:124
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 736 of file walsender.c.

738{
739 int mtype;
740 int maxmsglen;
741
743
745 mtype = pq_getbyte();
746 if (mtype == EOF)
749 errmsg("unexpected EOF on client connection with an open transaction")));
750
751 switch (mtype)
752 {
753 case PqMsg_CopyData:
755 break;
756 case PqMsg_CopyDone:
757 case PqMsg_CopyFail:
758 case PqMsg_Flush:
759 case PqMsg_Sync:
761 break;
762 default:
765 errmsg("unexpected message type 0x%02X during COPY from stdin",
766 mtype)));
767 maxmsglen = 0; /* keep compiler quiet */
768 break;
769 }
770
771 /* Now collect the message body */
775 errmsg("unexpected EOF on client connection with an open transaction")));
777
778 /* Process the message */
779 switch (mtype)
780 {
781 case PqMsg_CopyData:
783 return true;
784
785 case PqMsg_CopyDone:
786 return false;
787
788 case PqMsg_Sync:
789 case PqMsg_Flush:
790 /* Ignore these while in CopyOut mode as we do elsewhere. */
791 return true;
792
793 case PqMsg_CopyFail:
796 errmsg("COPY from stdin failed: %s",
798 }
799
800 /* Not reached. */
801 Assert(false);
802 return false;
803}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1203
int pq_getbyte(void)
Definition pqcomm.c:963
void pq_startmsgread(void)
Definition pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), errmsg(), ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3704 of file walsender.c.

3705{
3707
3708 /*
3709 * If replication has not yet started, die like with SIGTERM. If
3710 * replication is active, only set a flag and wake up the main loop. It
3711 * will send any outstanding WAL, wait for it to be replicated to the
3712 * standby, and then exit gracefully.
3713 */
3714 if (!replication_active)
3716 else
3717 got_STOPPING = true;
3718}
int MyProcPid
Definition globals.c:47
bool am_walsender
Definition walsender.c:123
static volatile sig_atomic_t replication_active
Definition walsender.c:214
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 400 of file walsender.c.

401{
402 char sysid[32];
403 char xloc[MAXFNAMELEN];
405 char *dbname = NULL;
408 TupleDesc tupdesc;
409 Datum values[4];
410 bool nulls[4] = {0};
411 TimeLineID currTLI;
412
413 /*
414 * Reply with a result set with one row, four columns. First col is system
415 * ID, second is timeline ID, third is current xlog location and the
416 * fourth contains the database name if we are connected to one.
417 */
418
421
424 logptr = GetStandbyFlushRecPtr(&currTLI);
425 else
426 logptr = GetFlushRecPtr(&currTLI);
427
428 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
429
431 {
433
434 /* syscache access needs a transaction env. */
437 /* copy dbname out of TX context */
440 }
441
443
444 /* need a tuple descriptor representing four columns */
445 tupdesc = CreateTemplateTupleDesc(4);
446 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
447 TEXTOID, -1, 0);
448 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
449 INT8OID, -1, 0);
450 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
451 TEXTOID, -1, 0);
452 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
453 TEXTOID, -1, 0);
454
455 /* prepare for projection of tuples */
457
458 /* column 1: system identifier */
460
461 /* column 2: timeline */
462 values[1] = Int64GetDatum(currTLI);
463
464 /* column 3: wal location */
466
467 /* column 4: database name, or NULL if none */
468 if (dbname)
470 else
471 nulls[3] = true;
472
473 /* send it to dest */
474 do_tup_output(tstate, values, nulls);
475
477}
#define UINT64_FORMAT
Definition c.h:565
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1242
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
static Datum Int64GetDatum(int64 X)
Definition postgres.h:423
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3648
uint64 GetSystemIdentifier(void)
Definition xlog.c:4627
bool RecoveryInProgress(void)
Definition xlog.c:6460
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6625

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:63
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PROC_HDR * ProcGlobal
Definition proc.c:79
TransactionId xmin
Definition proc.h:195
uint8 statusFlags
Definition proc.h:259
int pgxactoff
Definition proc.h:202
uint8 * statusFlags
Definition proc.h:403
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3032
static LagTracker * lag_tracker
Definition walsender.c:252

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3032 of file walsender.c.

3033{
3034 int i;
3035
3036 /*
3037 * WalSndCtl should be set up already (we inherit this by fork() or
3038 * EXEC_BACKEND mechanism from the postmaster).
3039 */
3040 Assert(WalSndCtl != NULL);
3041 Assert(MyWalSnd == NULL);
3042
3043 /*
3044 * Find a free walsender slot and reserve it. This must not fail due to
3045 * the prior check for free WAL senders in InitProcess().
3046 */
3047 for (i = 0; i < max_wal_senders; i++)
3048 {
3050
3051 SpinLockAcquire(&walsnd->mutex);
3052
3053 if (walsnd->pid != 0)
3054 {
3055 SpinLockRelease(&walsnd->mutex);
3056 continue;
3057 }
3058 else
3059 {
3060 /*
3061 * Found a free slot. Reserve it for us.
3062 */
3063 walsnd->pid = MyProcPid;
3064 walsnd->state = WALSNDSTATE_STARTUP;
3065 walsnd->sentPtr = InvalidXLogRecPtr;
3066 walsnd->needreload = false;
3067 walsnd->write = InvalidXLogRecPtr;
3068 walsnd->flush = InvalidXLogRecPtr;
3069 walsnd->apply = InvalidXLogRecPtr;
3070 walsnd->writeLag = -1;
3071 walsnd->flushLag = -1;
3072 walsnd->applyLag = -1;
3073 walsnd->sync_standby_priority = 0;
3074 walsnd->replyTime = 0;
3075
3076 /*
3077 * The kind assignment is done here and not in StartReplication()
3078 * and StartLogicalReplication(). Indeed, the logical walsender
3079 * needs to read WAL records (like snapshot of running
3080 * transactions) during the slot creation. So it needs to be woken
3081 * up based on its kind.
3082 *
3083 * The kind assignment could also be done in StartReplication(),
3084 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3085 * seems better to set it on one place.
3086 */
3087 if (MyDatabaseId == InvalidOid)
3089 else
3091
3092 SpinLockRelease(&walsnd->mutex);
3093 /* don't need the lock anymore */
3094 MyWalSnd = walsnd;
3095
3096 break;
3097 }
3098 }
3099
3100 Assert(MyWalSnd != NULL);
3101
3102 /* Arrange to clean up at walsender exit */
3104}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
#define SpinLockRelease(lock)
Definition spin.h:61
#define SpinLockAcquire(lock)
Definition spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:129
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3108
WalSndCtlData * WalSndCtl
Definition walsender.c:117
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire, SpinLockRelease, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4297 of file walsender.c.

4298{
4299 TimestampTz time = 0;
4300
4301 /*
4302 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4303 * return the elapsed time (in microseconds) since the saved local flush
4304 * time. If the flush time is in the future (due to clock drift), return
4305 * -1 to treat as no valid sample.
4306 *
4307 * Otherwise, switch back to using the buffer to control the read head and
4308 * compute the elapsed time. The read head is then reset to point to the
4309 * oldest entry in the buffer.
4310 */
4311 if (lag_tracker->read_heads[head] == -1)
4312 {
4313 if (lag_tracker->overflowed[head].lsn > lsn)
4314 return (now >= lag_tracker->overflowed[head].time) ?
4315 now - lag_tracker->overflowed[head].time : -1;
4316
4317 time = lag_tracker->overflowed[head].time;
4319 lag_tracker->read_heads[head] =
4321 }
4322
4323 /* Read all unread samples up to this LSN or end of buffer. */
4324 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4326 {
4328 lag_tracker->last_read[head] =
4330 lag_tracker->read_heads[head] =
4332 }
4333
4334 /*
4335 * If the lag tracker is empty, that means the standby has processed
4336 * everything we've ever sent so we should now clear 'last_read'. If we
4337 * didn't do that, we'd risk using a stale and irrelevant sample for
4338 * interpolation at the beginning of the next burst of WAL after a period
4339 * of idleness.
4340 */
4342 lag_tracker->last_read[head].time = 0;
4343
4344 if (time > now)
4345 {
4346 /* If the clock somehow went backwards, treat as not found. */
4347 return -1;
4348 }
4349 else if (time == 0)
4350 {
4351 /*
4352 * We didn't cross a time. If there is a future sample that we
4353 * haven't reached yet, and we've already reached at least one sample,
4354 * let's interpolate the local flushed time. This is mainly useful
4355 * for reporting a completely stuck apply position as having
4356 * increasing lag, since otherwise we'd have to wait for it to
4357 * eventually start moving again and cross one of our samples before
4358 * we can show the lag increasing.
4359 */
4361 {
4362 /* There are no future samples, so we can't interpolate. */
4363 return -1;
4364 }
4365 else if (lag_tracker->last_read[head].time != 0)
4366 {
4367 /* We can interpolate between last_read and the next sample. */
4368 double fraction;
4369 WalTimeSample prev = lag_tracker->last_read[head];
4371
4372 if (lsn < prev.lsn)
4373 {
4374 /*
4375 * Reported LSNs shouldn't normally go backwards, but it's
4376 * possible when there is a timeline change. Treat as not
4377 * found.
4378 */
4379 return -1;
4380 }
4381
4382 Assert(prev.lsn < next.lsn);
4383
4384 if (prev.time > next.time)
4385 {
4386 /* If the clock somehow went backwards, treat as not found. */
4387 return -1;
4388 }
4389
4390 /* See how far we are between the previous and next samples. */
4391 fraction =
4392 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4393
4394 /* Scale the local flush time proportionally. */
4395 time = (TimestampTz)
4396 ((double) prev.time + (next.time - prev.time) * fraction);
4397 }
4398 else
4399 {
4400 /*
4401 * We have only a future sample, implying that we were entirely
4402 * caught up but and now there is a new burst of WAL and the
4403 * standby hasn't processed the first sample yet. Until the
4404 * standby reaches the future sample the best we can do is report
4405 * the hypothetical lag if that sample were to be replayed now.
4406 */
4408 }
4409 }
4410
4411 /* Return the elapsed time since local flush time in microseconds. */
4412 Assert(time != 0);
4413 return now - time;
4414}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:232
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:234
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:235
int write_head
Definition walsender.c:233
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:249
TimestampTz time
Definition walsender.c:222
XLogRecPtr lsn
Definition walsender.c:221
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:226

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4239 of file walsender.c.

4240{
4241 int new_write_head;
4242 int i;
4243
4244 if (!am_walsender)
4245 return;
4246
4247 /*
4248 * If the lsn hasn't advanced since last time, then do nothing. This way
4249 * we only record a new sample when new WAL has been written.
4250 */
4251 if (lag_tracker->last_lsn == lsn)
4252 return;
4253 lag_tracker->last_lsn = lsn;
4254
4255 /*
4256 * If advancing the write head of the circular buffer would crash into any
4257 * of the read heads, then the buffer is full. In other words, the
4258 * slowest reader (presumably apply) is the one that controls the release
4259 * of space.
4260 */
4262 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4263 {
4264 /*
4265 * If the buffer is full, move the slowest reader to a separate
4266 * overflow entry and free its space in the buffer so the write head
4267 * can advance.
4268 */
4270 {
4273 lag_tracker->read_heads[i] = -1;
4274 }
4275 }
4276
4277 /* Store a sample at the current write head position. */
4281}
XLogRecPtr last_lsn
Definition walsender.c:231
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1044 of file walsender.c.

1046{
1048 int count;
1050 XLogSegNo segno;
1051 TimeLineID currTLI;
1052
1053 /*
1054 * Make sure we have enough WAL available before retrieving the current
1055 * timeline.
1056 */
1058
1059 /* Fail if not enough (implies we are going to shut down) */
1061 return -1;
1062
1063 /*
1064 * Since logical decoding is also permitted on a standby server, we need
1065 * to check if the server is in recovery to decide how to get the current
1066 * timeline ID (so that it also covers the promotion or timeline change
1067 * cases). We must determine am_cascading_walsender after waiting for the
1068 * required WAL so that it is correct when the walsender wakes up after a
1069 * promotion.
1070 */
1072
1074 GetXLogReplayRecPtr(&currTLI);
1075 else
1076 currTLI = GetWALInsertionTimeLine();
1077
1079 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1080 sendTimeLine = state->currTLI;
1081 sendTimeLineValidUpto = state->currTLIValidUntil;
1082 sendTimeLineNextTLI = state->nextTLI;
1083
1085 count = XLOG_BLCKSZ; /* more than one block available */
1086 else
1087 count = flushptr - targetPagePtr; /* part of the page available */
1088
1089 /* now actually read the data, we know it's there */
1090 if (!WALRead(state,
1091 cur_page,
1093 count,
1094 currTLI, /* Pass the current TLI because only
1095 * WalSndSegmentOpen controls whether new TLI
1096 * is needed. */
1097 &errinfo))
1099
1100 /*
1101 * After reading into the buffer, check that what we read was valid. We do
1102 * this after reading, because even though the segment was present when we
1103 * opened it, it might get recycled or removed while we read it. The
1104 * read() succeeds in that case, but the data we tried to read might
1105 * already have been overwritten with new WAL records.
1106 */
1107 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1108 CheckXLogRemoved(segno, state->seg.ws_tli);
1109
1110 return count;
1111}
static TimeLineID sendTimeLine
Definition walsender.c:164
static bool sendTimeLineIsHistoric
Definition walsender.c:166
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1844
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:165
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:167
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:6646
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3764
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1784 of file walsender.c.

1785{
1786 int elevel = got_STOPPING ? ERROR : WARNING;
1787 bool failover_slot;
1788
1790
1791 /*
1792 * Note that after receiving the shutdown signal, an ERROR is reported if
1793 * any slots are dropped, invalidated, or inactive. This measure is taken
1794 * to prevent the walsender from waiting indefinitely.
1795 */
1797 {
1799 return true;
1800 }
1801
1802 *wait_event = 0;
1803 return false;
1804}
#define WARNING
Definition elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3083

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1816 of file walsender.c.

1818{
1819 /* Check if we need to wait for WALs to be flushed to disk */
1820 if (target_lsn > flushed_lsn)
1821 {
1823 return true;
1824 }
1825
1826 /* Check if the standby slots have caught up to the flushed position */
1828}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1784

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3991 of file walsender.c.

3992{
3993 Interval *result = palloc_object(Interval);
3994
3995 result->month = 0;
3996 result->day = 0;
3997 result->time = offset;
3998
3999 return result;
4000}
#define palloc_object(type)
Definition fe_memutils.h:74
int32 day
Definition timestamp.h:51
int32 month
Definition timestamp.h:52
TimeOffset time
Definition timestamp.h:49

References Interval::day, Interval::month, palloc_object, and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1117 of file walsender.c.

1121{
1122 ListCell *lc;
1123 bool snapshot_action_given = false;
1124 bool reserve_wal_given = false;
1125 bool two_phase_given = false;
1126 bool failover_given = false;
1127
1128 /* Parse options */
1129 foreach(lc, cmd->options)
1130 {
1131 DefElem *defel = (DefElem *) lfirst(lc);
1132
1133 if (strcmp(defel->defname, "snapshot") == 0)
1134 {
1135 char *action;
1136
1138 ereport(ERROR,
1140 errmsg("conflicting or redundant options")));
1141
1143 snapshot_action_given = true;
1144
1145 if (strcmp(action, "export") == 0)
1147 else if (strcmp(action, "nothing") == 0)
1149 else if (strcmp(action, "use") == 0)
1151 else
1152 ereport(ERROR,
1154 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1155 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1156 }
1157 else if (strcmp(defel->defname, "reserve_wal") == 0)
1158 {
1160 ereport(ERROR,
1162 errmsg("conflicting or redundant options")));
1163
1164 reserve_wal_given = true;
1166 }
1167 else if (strcmp(defel->defname, "two_phase") == 0)
1168 {
1170 ereport(ERROR,
1172 errmsg("conflicting or redundant options")));
1173 two_phase_given = true;
1175 }
1176 else if (strcmp(defel->defname, "failover") == 0)
1177 {
1179 ereport(ERROR,
1181 errmsg("conflicting or redundant options")));
1182 failover_given = true;
1184 }
1185 else
1186 elog(ERROR, "unrecognized option: %s", defel->defname);
1187 }
1188}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg(), ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 4007 of file walsender.c.

4008{
4009#define PG_STAT_GET_WAL_SENDERS_COLS 12
4010 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4012 int num_standbys;
4013 int i;
4014
4015 InitMaterializedSRF(fcinfo, 0);
4016
4017 /*
4018 * Get the currently active synchronous standbys. This could be out of
4019 * date before we're done, but we'll use the data anyway.
4020 */
4022
4023 for (i = 0; i < max_wal_senders; i++)
4024 {
4028 XLogRecPtr flush;
4029 XLogRecPtr apply;
4030 TimeOffset writeLag;
4031 TimeOffset flushLag;
4032 TimeOffset applyLag;
4033 int priority;
4034 int pid;
4036 TimestampTz replyTime;
4037 bool is_sync_standby;
4039 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4040 int j;
4041
4042 /* Collect data from shared memory */
4043 SpinLockAcquire(&walsnd->mutex);
4044 if (walsnd->pid == 0)
4045 {
4046 SpinLockRelease(&walsnd->mutex);
4047 continue;
4048 }
4049 pid = walsnd->pid;
4050 sent_ptr = walsnd->sentPtr;
4051 state = walsnd->state;
4052 write = walsnd->write;
4053 flush = walsnd->flush;
4054 apply = walsnd->apply;
4055 writeLag = walsnd->writeLag;
4056 flushLag = walsnd->flushLag;
4057 applyLag = walsnd->applyLag;
4058 priority = walsnd->sync_standby_priority;
4059 replyTime = walsnd->replyTime;
4060 SpinLockRelease(&walsnd->mutex);
4061
4062 /*
4063 * Detect whether walsender is/was considered synchronous. We can
4064 * provide some protection against stale data by checking the PID
4065 * along with walsnd_index.
4066 */
4067 is_sync_standby = false;
4068 for (j = 0; j < num_standbys; j++)
4069 {
4070 if (sync_standbys[j].walsnd_index == i &&
4071 sync_standbys[j].pid == pid)
4072 {
4073 is_sync_standby = true;
4074 break;
4075 }
4076 }
4077
4078 values[0] = Int32GetDatum(pid);
4079
4081 {
4082 /*
4083 * Only superusers and roles with privileges of pg_read_all_stats
4084 * can see details. Other users only get the pid value to know
4085 * it's a walsender, but no details.
4086 */
4087 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4088 }
4089 else
4090 {
4092
4094 nulls[2] = true;
4096
4098 nulls[3] = true;
4099 values[3] = LSNGetDatum(write);
4100
4101 if (!XLogRecPtrIsValid(flush))
4102 nulls[4] = true;
4103 values[4] = LSNGetDatum(flush);
4104
4105 if (!XLogRecPtrIsValid(apply))
4106 nulls[5] = true;
4107 values[5] = LSNGetDatum(apply);
4108
4109 /*
4110 * Treat a standby such as a pg_basebackup background process
4111 * which always returns an invalid flush location, as an
4112 * asynchronous standby.
4113 */
4114 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4115
4116 if (writeLag < 0)
4117 nulls[6] = true;
4118 else
4120
4121 if (flushLag < 0)
4122 nulls[7] = true;
4123 else
4125
4126 if (applyLag < 0)
4127 nulls[8] = true;
4128 else
4130
4132
4133 /*
4134 * More easily understood version of standby state. This is purely
4135 * informational.
4136 *
4137 * In quorum-based sync replication, the role of each standby
4138 * listed in synchronous_standby_names can be changing very
4139 * frequently. Any standbys considered as "sync" at one moment can
4140 * be switched to "potential" ones at the next moment. So, it's
4141 * basically useless to report "sync" or "potential" as their sync
4142 * states. We report just "quorum" for them.
4143 */
4144 if (priority == 0)
4145 values[10] = CStringGetTextDatum("async");
4146 else if (is_sync_standby)
4148 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4149 else
4150 values[10] = CStringGetTextDatum("potential");
4151
4152 if (replyTime == 0)
4153 nulls[11] = true;
4154 else
4155 values[11] = TimestampTzGetDatum(replyTime);
4156 }
4157
4158 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4159 values, nulls);
4160 }
4161
4162 return (Datum) 0;
4163}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5284
#define MemSet(start, val, len)
Definition c.h:1013
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:469
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:754
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:3991
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:3972
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire, SpinLockRelease, SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2417 of file walsender.c.

2418{
2419 bool changed = false;
2421
2423 SpinLockAcquire(&slot->mutex);
2424 if (slot->data.restart_lsn != lsn)
2425 {
2426 changed = true;
2427 slot->data.restart_lsn = lsn;
2428 }
2429 SpinLockRelease(&slot->mutex);
2430
2431 if (changed)
2432 {
2436 }
2437
2438 /*
2439 * One could argue that the slot should be saved to disk now, but that'd
2440 * be energy wasted - the worst thing lost information could cause here is
2441 * to give wrong information in a statistics view - we'll just potentially
2442 * be more conservative in removing files.
2443 */
2444}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1297
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1759

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2555 of file walsender.c.

2556{
2557 bool changed = false;
2559
2560 SpinLockAcquire(&slot->mutex);
2562
2563 /*
2564 * For physical replication we don't need the interlock provided by xmin
2565 * and effective_xmin since the consequences of a missed increase are
2566 * limited to query cancellations, so set both at once.
2567 */
2568 if (!TransactionIdIsNormal(slot->data.xmin) ||
2571 {
2572 changed = true;
2573 slot->data.xmin = feedbackXmin;
2575 }
2579 {
2580 changed = true;
2583 }
2584 SpinLockRelease(&slot->mutex);
2585
2586 if (changed)
2587 {
2590 }
2591}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1215
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:207
TransactionId effective_xmin
Definition slot.h:206
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1759 of file walsender.c.

1760{
1762
1763 /*
1764 * If we are running in a standby, there is no need to wake up walsenders.
1765 * This is because we do not support syncing slots to cascading standbys,
1766 * so, there are no walsenders waiting for standbys to catch up.
1767 */
1768 if (RecoveryInProgress())
1769 return;
1770
1773}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3050
#define SlotIsPhysical(slot)
Definition slot.h:284
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1645 of file walsender.c.

1646{
1647 for (;;)
1648 {
1649 long sleeptime;
1650
1651 /* Check for input from the client */
1653
1654 /* die if timeout was reached */
1656
1657 /* Send keepalive if the time has come */
1659
1660 if (!pq_is_send_pending())
1661 break;
1662
1664
1665 /* Sleep until something happens or we time out */
1668
1669 /* Clear any already-pending wakeups */
1671
1673
1674 /* Process any requests or signals received recently */
1676
1677 /* Try to flush pending output to the client */
1678 if (pq_flush_if_writable() != 0)
1680 }
1681
1682 /* reactivate latch so WalSndLoop knows to continue */
1684}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
struct Latch * MyLatch
Definition globals.c:63
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:47
#define pq_is_send_pending()
Definition libpq.h:48
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:3831
static void WalSndCheckTimeOut(void)
Definition walsender.c:2868
static void ProcessRepliesIfAny(void)
Definition walsender.c:2266
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4201
static void WalSndHandleConfigReload(void)
Definition walsender.c:1622
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:384
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2824

References CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), SetLatch(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2266 of file walsender.c.

2267{
2268 unsigned char firstchar;
2269 int maxmsglen;
2270 int r;
2271 bool received = false;
2272
2274
2275 /*
2276 * If we already received a CopyDone from the frontend, any subsequent
2277 * message is the beginning of a new command, and should be processed in
2278 * the main processing loop.
2279 */
2280 while (!streamingDoneReceiving)
2281 {
2284 if (r < 0)
2285 {
2286 /* unexpected error or EOF */
2289 errmsg("unexpected EOF on standby connection")));
2290 proc_exit(0);
2291 }
2292 if (r == 0)
2293 {
2294 /* no data available without blocking */
2295 pq_endmsgread();
2296 break;
2297 }
2298
2299 /* Validate message type and set packet size limit */
2300 switch (firstchar)
2301 {
2302 case PqMsg_CopyData:
2304 break;
2305 case PqMsg_CopyDone:
2306 case PqMsg_Terminate:
2308 break;
2309 default:
2310 ereport(FATAL,
2312 errmsg("invalid standby message type \"%c\"",
2313 firstchar)));
2314 maxmsglen = 0; /* keep compiler quiet */
2315 break;
2316 }
2317
2318 /* Read the message contents */
2321 {
2324 errmsg("unexpected EOF on standby connection")));
2325 proc_exit(0);
2326 }
2327
2328 /* ... and process it */
2329 switch (firstchar)
2330 {
2331 /*
2332 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2333 * packet.
2334 */
2335 case PqMsg_CopyData:
2337 received = true;
2338 break;
2339
2340 /*
2341 * PqMsg_CopyDone means the standby requested to finish
2342 * streaming. Reply with CopyDone, if we had not sent that
2343 * already.
2344 */
2345 case PqMsg_CopyDone:
2347 {
2349 streamingDoneSending = true;
2350 }
2351
2353 received = true;
2354 break;
2355
2356 /*
2357 * PqMsg_Terminate means that the standby is closing down the
2358 * socket.
2359 */
2360 case PqMsg_Terminate:
2361 proc_exit(0);
2362
2363 default:
2364 Assert(false); /* NOT REACHED */
2365 }
2366 }
2367
2368 /*
2369 * Save the last reply timestamp if we've received at least one reply.
2370 */
2371 if (received)
2372 {
2375 }
2376}
#define COMMERROR
Definition elog.h:33
#define FATAL
Definition elog.h:41
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1003
void pq_endmsgread(void)
Definition pqcomm.c:1165
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:190
static TimestampTz last_processing
Definition walsender.c:181
static bool streamingDoneSending
Definition walsender.c:198
static void ProcessStandbyMessage(void)
Definition walsender.c:2382
static bool streamingDoneReceiving
Definition walsender.c:199

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2635 of file walsender.c.

2636{
2641 TimestampTz replyTime;
2642
2643 /*
2644 * Decipher the reply message. The caller already consumed the msgtype
2645 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2646 * of this message.
2647 */
2648 replyTime = pq_getmsgint64(&reply_message);
2653
2655 {
2656 char *replyTimeStr;
2657
2658 /* Copy because timestamptz_to_str returns a static buffer */
2660
2661 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2666 replyTimeStr);
2667
2669 }
2670
2671 /*
2672 * Update shared state for this WalSender process based on reply data from
2673 * standby.
2674 */
2675 {
2677
2678 SpinLockAcquire(&walsnd->mutex);
2679 walsnd->replyTime = replyTime;
2680 SpinLockRelease(&walsnd->mutex);
2681 }
2682
2683 /*
2684 * Unset WalSender's xmins if the feedback message values are invalid.
2685 * This happens when the downstream turned hot_standby_feedback off.
2686 */
2689 {
2691 if (MyReplicationSlot != NULL)
2693 return;
2694 }
2695
2696 /*
2697 * Check that the provided xmin/epoch are sane, that is, not in the future
2698 * and not so far back as to be already wrapped around. Ignore if not.
2699 */
2702 return;
2703
2706 return;
2707
2708 /*
2709 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2710 * the xmin will be taken into account by GetSnapshotData() /
2711 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2712 * thereby prevent the generation of cleanup conflicts on the standby
2713 * server.
2714 *
2715 * There is a small window for a race condition here: although we just
2716 * checked that feedbackXmin precedes nextXid, the nextXid could have
2717 * gotten advanced between our fetching it and applying the xmin below,
2718 * perhaps far enough to make feedbackXmin wrap around. In that case the
2719 * xmin we set here would be "in the future" and have no effect. No point
2720 * in worrying about this since it's too late to save the desired data
2721 * anyway. Assuming that the standby sends us an increasing sequence of
2722 * xmins, this could only happen during the first reply cycle, else our
2723 * own xmin would prevent nextXid from advancing so far.
2724 *
2725 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2726 * is assumed atomic, and there's no real need to prevent concurrent
2727 * horizon determinations. (If we're moving our xmin forward, this is
2728 * obviously safe, and if we're moving it backwards, well, the data is at
2729 * risk already since a VACUUM could already have determined the horizon.)
2730 *
2731 * If we're using a replication slot we reserve the xmin via that,
2732 * otherwise via the walsender's PGPROC entry. We can only track the
2733 * catalog xmin separately when using a slot, so we store the least of the
2734 * two provided when not using a slot.
2735 *
2736 * XXX: It might make sense to generalize the ephemeral slot concept and
2737 * always use the slot mechanism to handle the feedback xmin.
2738 */
2739 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2741 else
2742 {
2746 else
2748 }
2749}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1862
uint32_t uint32
Definition c.h:546
uint32 TransactionId
Definition c.h:666
bool message_level_is_interesting(int elevel)
Definition elog.c:273
#define DEBUG2
Definition elog.h:29
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2555
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2604

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2382 of file walsender.c.

2383{
2384 char msgtype;
2385
2386 /*
2387 * Check message type from the first byte.
2388 */
2390
2391 switch (msgtype)
2392 {
2395 break;
2396
2399 break;
2400
2403 break;
2404
2405 default:
2408 errmsg("unexpected message type \"%c\"", msgtype)));
2409 proc_exit(0);
2410 }
2411}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2635
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2755
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2450

References COMMERROR, ereport, errcode(), errmsg(), fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2755 of file walsender.c.

2756{
2763 TimestampTz replyTime;
2764
2765 /*
2766 * This shouldn't happen because we don't support getting primary status
2767 * message from standby.
2768 */
2769 if (RecoveryInProgress())
2770 elog(ERROR, "the primary status is unavailable during recovery");
2771
2772 replyTime = pq_getmsgint64(&reply_message);
2773
2774 /*
2775 * Update shared state for this WalSender process based on reply data from
2776 * standby.
2777 */
2778 SpinLockAcquire(&walsnd->mutex);
2779 walsnd->replyTime = replyTime;
2780 SpinLockRelease(&walsnd->mutex);
2781
2782 /*
2783 * Consider transactions in the current database, as only these are the
2784 * ones replicated.
2785 */
2788
2789 /*
2790 * Update the oldest xid for standby transmission if an older prepared
2791 * transaction exists and is currently in commit phase.
2792 */
2796
2800 lsn = GetXLogWriteRecPtr();
2801
2802 elog(DEBUG2, "sending primary status");
2803
2804 /* construct the message... */
2811
2812 /* ... and send it wrapped in CopyData */
2814}
int64_t int64
Definition c.h:543
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2830
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:443
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2829
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:288
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:9614

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2450 of file walsender.c.

2451{
2453 flushPtr,
2454 applyPtr;
2455 bool replyRequested;
2456 TimeOffset writeLag,
2457 flushLag,
2458 applyLag;
2459 bool clearLagTimes;
2461 TimestampTz replyTime;
2462
2463 static bool fullyAppliedLastTime = false;
2464
2465 /* the caller already consumed the msgtype byte */
2469 replyTime = pq_getmsgint64(&reply_message);
2471
2473 {
2474 char *replyTimeStr;
2475
2476 /* Copy because timestamptz_to_str returns a static buffer */
2478
2479 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2483 replyRequested ? " (reply requested)" : "",
2484 replyTimeStr);
2485
2487 }
2488
2489 /* See if we can compute the round-trip lag for these positions. */
2494
2495 /*
2496 * If the standby reports that it has fully replayed the WAL in two
2497 * consecutive reply messages, then the second such message must result
2498 * from wal_receiver_status_interval expiring on the standby. This is a
2499 * convenient time to forget the lag times measured when it last
2500 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2501 * until more WAL traffic arrives.
2502 */
2503 clearLagTimes = false;
2504 if (applyPtr == sentPtr)
2505 {
2507 clearLagTimes = true;
2508 fullyAppliedLastTime = true;
2509 }
2510 else
2511 fullyAppliedLastTime = false;
2512
2513 /* Send a reply if the standby requested one. */
2514 if (replyRequested)
2516
2517 /*
2518 * Update shared state for this WalSender process based on reply data from
2519 * standby.
2520 */
2521 {
2523
2524 SpinLockAcquire(&walsnd->mutex);
2525 walsnd->write = writePtr;
2526 walsnd->flush = flushPtr;
2527 walsnd->apply = applyPtr;
2528 if (writeLag != -1 || clearLagTimes)
2529 walsnd->writeLag = writeLag;
2530 if (flushLag != -1 || clearLagTimes)
2531 walsnd->flushLag = flushLag;
2532 if (applyLag != -1 || clearLagTimes)
2533 walsnd->applyLag = applyLag;
2534 walsnd->replyTime = replyTime;
2535 SpinLockRelease(&walsnd->mutex);
2536 }
2537
2540
2541 /*
2542 * Advance our local xmin horizon when the client confirmed a flush.
2543 */
2545 {
2548 else
2550 }
2551}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1811
#define SlotIsLogical(slot)
Definition slot.h:285
void SyncRepReleaseWaiters(void)
Definition syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:173
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2417
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4178
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4297

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 481 of file walsender.c.

482{
483#define READ_REPLICATION_SLOT_COLS 3
484 ReplicationSlot *slot;
487 TupleDesc tupdesc;
489 bool nulls[READ_REPLICATION_SLOT_COLS];
490
492 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
493 TEXTOID, -1, 0);
494 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
495 TEXTOID, -1, 0);
496 /* TimeLineID is unsigned, so int4 is not wide enough. */
497 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
498 INT8OID, -1, 0);
499
500 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
501
503 slot = SearchNamedReplicationSlot(cmd->slotname, false);
504 if (slot == NULL || !slot->in_use)
505 {
507 }
508 else
509 {
511 int i = 0;
512
513 /* Copy slot contents while holding spinlock */
514 SpinLockAcquire(&slot->mutex);
515 slot_contents = *slot;
516 SpinLockRelease(&slot->mutex);
518
519 if (OidIsValid(slot_contents.data.database))
522 errmsg("cannot use %s with a logical replication slot",
523 "READ_REPLICATION_SLOT"));
524
525 /* slot type */
526 values[i] = CStringGetTextDatum("physical");
527 nulls[i] = false;
528 i++;
529
530 /* start LSN */
531 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
532 {
533 char xloc[64];
534
535 snprintf(xloc, sizeof(xloc), "%X/%08X",
536 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
538 nulls[i] = false;
539 }
540 i++;
541
542 /* timeline this WAL was produced on */
543 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
544 {
548
549 /*
550 * While in recovery, use as timeline the currently-replaying one
551 * to get the LSN position's history.
552 */
553 if (RecoveryInProgress())
555 else
557
562 nulls[i] = false;
563 }
564 i++;
565
567 }
568
571 do_tup_output(tstate, values, nulls);
573}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:544
#define OidIsValid(objectId)
Definition c.h:788
@ LW_SHARED
Definition lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:540
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 580 of file walsender.c.

581{
583 TupleDesc tupdesc;
586 char path[MAXPGPATH];
587 int fd;
590 Size len;
591
593
594 /*
595 * Reply with a result set with one row, and two columns. The first col is
596 * the name of the history file, 2nd is the contents.
597 */
598 tupdesc = CreateTemplateTupleDesc(2);
599 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
600 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
601
603 TLHistoryFilePath(path, cmd->timeline);
604
605 /* Send a RowDescription message */
606 dest->rStartup(dest, CMD_SELECT, tupdesc);
607
608 /* Send a DataRow message */
610 pq_sendint16(&buf, 2); /* # of columns */
612 pq_sendint32(&buf, len); /* col1 len */
614
616 if (fd < 0)
619 errmsg("could not open file \"%s\": %m", path)));
620
621 /* Determine file length and send it to client */
623 if (histfilelen < 0)
626 errmsg("could not seek to end of file \"%s\": %m", path)));
627 if (lseek(fd, 0, SEEK_SET) != 0)
630 errmsg("could not seek to beginning of file \"%s\": %m", path)));
631
632 pq_sendint32(&buf, histfilelen); /* col2 len */
633
635 while (bytesleft > 0)
636 {
638 int nread;
639
641 nread = read(fd, rbuf.data, sizeof(rbuf));
643 if (nread < 0)
646 errmsg("could not read file \"%s\": %m",
647 path)));
648 else if (nread == 0)
651 errmsg("could not read file \"%s\": read %d of %zu",
652 path, nread, (Size) bytesleft)));
653
654 pq_sendbytes(&buf, rbuf.data, nread);
655 bytesleft -= nread;
656 }
657
658 if (CloseTransientFile(fd) != 0)
661 errmsg("could not close file \"%s\": %m", path)));
662
664}
#define PG_BINARY
Definition c.h:1287
size_t Size
Definition c.h:619
int errcode_for_file_access(void)
Definition elog.c:886
int CloseTransientFile(int fd)
Definition fd.c:2854
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2677
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:275
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1457 of file walsender.c.

1458{
1460 QueryCompletion qc;
1461
1462 /* make sure that our requirements are still fulfilled */
1464
1466
1467 ReplicationSlotAcquire(cmd->slotname, true, true);
1468
1469 /*
1470 * Force a disconnect, so that the decoding code doesn't need to care
1471 * about an eventual switch from running in recovery, to running in a
1472 * normal environment. Client code is expected to handle reconnects.
1473 */
1475 {
1476 ereport(LOG,
1477 (errmsg("terminating walsender process after promotion")));
1478 got_STOPPING = true;
1479 }
1480
1481 /*
1482 * Create our decoding context, making it start at the previously ack'ed
1483 * position.
1484 *
1485 * Do this before sending a CopyBothResponse message, so that any errors
1486 * are reported early.
1487 */
1489 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1491 .segment_open = WalSndSegmentOpen,
1492 .segment_close = wal_segment_close),
1496
1498
1499 /* Send a CopyBothResponse message, and start streaming */
1501 pq_sendbyte(&buf, 0);
1502 pq_sendint16(&buf, 0);
1504 pq_flush();
1505
1506 /* Start reading WAL from the oldest required WAL. */
1509
1510 /*
1511 * Report the location after which we'll send out further commits as the
1512 * current sentPtr.
1513 */
1515
1516 /* Also update the sent position status in shared memory */
1520
1521 replication_active = true;
1522
1524
1525 /* Main loop of walsender */
1527
1530
1531 replication_active = false;
1532 if (got_STOPPING)
1533 proc_exit(0);
1535
1536 /* Get out of COPY mode (CommandComplete). */
1538 EndCommand(&qc, DestRemote, false);
1539}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:169
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:489
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:620
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
void SyncRepInitConfig(void)
Definition syncrep.c:445
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:2895
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:216
static void XLogSendLogical(void)
Definition walsender.c:3512
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:232

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 812 of file walsender.c.

813{
817
818 /* create xlogreader for physical replication */
819 xlogreader =
821 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
822 .segment_close = wal_segment_close),
823 NULL);
824
825 if (!xlogreader)
828 errmsg("out of memory"),
829 errdetail("Failed while allocating a WAL reading processor.")));
830
831 /*
832 * We assume here that we're logging enough information in the WAL for
833 * log-shipping, since this is checked in PostmasterMain().
834 *
835 * NOTE: wal_level can only change at shutdown, so in most cases it is
836 * difficult for there to be WAL data that we can still see that was
837 * written at wal_level='minimal'.
838 */
839
840 if (cmd->slotname)
841 {
842 ReplicationSlotAcquire(cmd->slotname, true, true);
846 errmsg("cannot use a logical replication slot for physical replication")));
847
848 /*
849 * We don't need to verify the slot's restart_lsn here; instead we
850 * rely on the caller requesting the starting point to use. If the
851 * WAL segment doesn't exist, we'll fail later.
852 */
853 }
854
855 /*
856 * Select the timeline. If it was given explicitly by the client, use
857 * that. Otherwise use the timeline of the last replayed record.
858 */
862 else
864
865 if (cmd->timeline != 0)
866 {
868
869 sendTimeLine = cmd->timeline;
870 if (sendTimeLine == FlushTLI)
871 {
874 }
875 else
876 {
878
880
881 /*
882 * Check that the timeline the client requested exists, and the
883 * requested start location is on that timeline.
884 */
889
890 /*
891 * Found the requested timeline in the history. Check that
892 * requested startpoint is on that timeline in our history.
893 *
894 * This is quite loose on purpose. We only check that we didn't
895 * fork off the requested timeline before the switchpoint. We
896 * don't check that we switched *to* it before the requested
897 * starting point. This is because the client can legitimately
898 * request to start replication from the beginning of the WAL
899 * segment that contains switchpoint, but on the new timeline, so
900 * that it doesn't end up with a partial segment. If you ask for
901 * too old a starting point, you'll get an error later when we
902 * fail to find the requested WAL segment in pg_wal.
903 *
904 * XXX: we could be more strict here and only allow a startpoint
905 * that's older than the switchpoint, if it's still in the same
906 * WAL segment.
907 */
909 switchpoint < cmd->startpoint)
910 {
912 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
914 cmd->timeline),
915 errdetail("This server's history forked from timeline %u at %X/%08X.",
916 cmd->timeline,
918 }
920 }
921 }
922 else
923 {
927 }
928
930
931 /* If there is nothing to stream, don't even enter COPY mode */
933 {
934 /*
935 * When we first start replication the standby will be behind the
936 * primary. For some applications, for example synchronous
937 * replication, it is important to have a clear state for this initial
938 * catchup mode, so we can trigger actions when we change streaming
939 * state later. We may stay in this state for a long time, which is
940 * exactly why we want to be able to monitor whether or not we are
941 * still here.
942 */
944
945 /* Send a CopyBothResponse message, and start streaming */
947 pq_sendbyte(&buf, 0);
948 pq_sendint16(&buf, 0);
950 pq_flush();
951
952 /*
953 * Don't allow a request to stream from a future point in WAL that
954 * hasn't been flushed to disk in this server yet.
955 */
956 if (FlushPtr < cmd->startpoint)
957 {
959 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
962 }
963
964 /* Start streaming from the requested point */
965 sentPtr = cmd->startpoint;
966
967 /* Initialize shared memory status, too */
971
973
974 /* Main loop of walsender */
975 replication_active = true;
976
978
979 replication_active = false;
980 if (got_STOPPING)
981 proc_exit(0);
983
985 }
986
987 if (cmd->slotname)
989
990 /*
991 * Copy is finished now. Send a single-row result set indicating the next
992 * timeline.
993 */
995 {
996 char startpos_str[8 + 1 + 8 + 1];
999 TupleDesc tupdesc;
1000 Datum values[2];
1001 bool nulls[2] = {0};
1002
1003 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1005
1007
1008 /*
1009 * Need a tuple descriptor representing two columns. int8 may seem
1010 * like a surprising data type for this, but in theory int4 would not
1011 * be wide enough for this, as TimeLineID is unsigned.
1012 */
1013 tupdesc = CreateTemplateTupleDesc(2);
1014 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1015 INT8OID, -1, 0);
1016 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1017 TEXTOID, -1, 0);
1018
1019 /* prepare for projection of tuple */
1021
1024
1025 /* send it to dest */
1026 do_tup_output(tstate, values, nulls);
1027
1029 }
1030
1031 /* Send CommandComplete message */
1032 EndReplicationCommand("START_STREAMING");
1033}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:572
int errdetail(const char *fmt,...)
Definition elog.c:1216
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3202
int wal_segment_size
Definition xlog.c:146
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:107

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2604 of file walsender.c.

2605{
2607 TransactionId nextXid;
2609
2613
2614 if (xid <= nextXid)
2615 {
2616 if (epoch != nextEpoch)
2617 return false;
2618 }
2619 else
2620 {
2621 if (epoch + 1 != nextEpoch)
2622 return false;
2623 }
2624
2625 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2626 return false; /* epoch OK, but it's wrapped around */
2627
2628 return true;
2629}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 670 of file walsender.c.

671{
672 MemoryContext mcxt;
674 off_t offset = 0;
676
677 /*
678 * parsing the manifest will use the cryptohash stuff, which requires a
679 * resource owner
680 */
685
686 /* Prepare to read manifest data into a temporary context. */
688 "incremental backup information",
691
692 /* Send a CopyInResponse message */
694 pq_sendbyte(&buf, 0);
695 pq_sendint16(&buf, 0);
697 pq_flush();
698
699 /* Receive packets from client until done. */
700 while (HandleUploadManifestPacket(&buf, &offset, ib))
701 ;
702
703 /* Finish up manifest processing. */
705
706 /*
707 * Discard any old manifest information and arrange to preserve the new
708 * information we just got.
709 *
710 * We assume that MemoryContextDelete and MemoryContextSetParent won't
711 * fail, and thus we shouldn't end up bailing out of here in such a way as
712 * to leave dangling pointers.
713 */
719
720 /* clean up the resource owner we created */
722}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext CacheMemoryContext
Definition mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:736
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:156

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2868 of file walsender.c.

2869{
2871
2872 /* don't bail out if we're doing something that doesn't require timeouts */
2873 if (last_reply_timestamp <= 0)
2874 return;
2875
2878
2880 {
2881 /*
2882 * Since typically expiration of replication timeout means
2883 * communication problem, we don't send the error message to the
2884 * standby.
2885 */
2887 (errmsg("terminating walsender process due to replication timeout")));
2888
2890 }
2891}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:131

References COMMERROR, ereport, errmsg(), fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2824 of file walsender.c.

2825{
2826 long sleeptime = 10000; /* 10 s */
2827
2829 {
2831
2832 /*
2833 * At the latest stop sleeping once wal_sender_timeout has been
2834 * reached.
2835 */
2838
2839 /*
2840 * If no ping has been sent yet, wakeup when it's time to do so.
2841 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2842 * the timeout passed without a response.
2843 */
2846 wal_sender_timeout / 2);
2847
2848 /* Compute relative time until wakeup. */
2850 }
2851
2852 return sleeptime;
2853}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757

References fb(), last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3605 of file walsender.c.

3606{
3608
3609 /* ... let's just be real sure we're caught up ... */
3610 send_data();
3611
3612 /*
3613 * To figure out whether all WAL has successfully been replicated, check
3614 * flush location if valid, write otherwise. Tools like pg_receivewal will
3615 * usually (unless in synchronous mode) return an invalid flush location.
3616 */
3619
3622 {
3623 QueryCompletion qc;
3624
3625 /* Inform the standby that XLOG streaming is done */
3627 EndCommand(&qc, DestRemote, false);
3628 pq_flush();
3629
3630 proc_exit(0);
3631 }
3634}
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:202

References DestRemote, EndCommand(), fb(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition aio.c:1165
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1892
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:857
WALOpenSegment seg
Definition xlogreader.h:271
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:205
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5011

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3972 of file walsender.c.

3973{
3974 switch (state)
3975 {
3977 return "startup";
3978 case WALSNDSTATE_BACKUP:
3979 return "backup";
3981 return "catchup";
3983 return "streaming";
3985 return "stopping";
3986 }
3987 return "UNKNOWN";
3988}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndHandleConfigReload()

static void WalSndHandleConfigReload ( void  )
static

Definition at line 1622 of file walsender.c.

1623{
1625 return;
1626
1627 ConfigReloadPending = false;
1630
1631 /*
1632 * Recheck and release any now-satisfied waiters after config reload
1633 * changes synchronous replication requirements (e.g., reducing the number
1634 * of sync standbys or changing the standby names).
1635 */
1638}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27

References am_cascading_walsender, ConfigReloadPending, PGC_SIGHUP, ProcessConfigFile(), SyncRepInitConfig(), and SyncRepReleaseWaiters().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3889 of file walsender.c.

3890{
3891 int i;
3892
3893 for (i = 0; i < max_wal_senders; i++)
3894 {
3896 pid_t pid;
3897
3898 SpinLockAcquire(&walsnd->mutex);
3899 pid = walsnd->pid;
3900 SpinLockRelease(&walsnd->mutex);
3901
3902 if (pid == 0)
3903 continue;
3904
3906 }
3907}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4201 of file walsender.c.

4202{
4204
4205 /*
4206 * Don't send keepalive messages if timeouts are globally disabled or
4207 * we're doing something not partaking in timeouts.
4208 */
4210 return;
4211
4213 return;
4214
4215 /*
4216 * If half of wal_sender_timeout has lapsed without receiving any reply
4217 * from the standby, send a keep-alive message to the standby requesting
4218 * an immediate reply.
4219 */
4221 wal_sender_timeout / 2);
4223 {
4225
4226 /* Try to flush pending output to the client */
4227 if (pq_flush_if_writable() != 0)
4229 }
4230}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3108 of file walsender.c.

3109{
3111
3112 Assert(walsnd != NULL);
3113
3114 MyWalSnd = NULL;
3115
3116 SpinLockAcquire(&walsnd->mutex);
3117 /* Mark WalSnd struct as no longer being in use. */
3118 walsnd->pid = 0;
3119 SpinLockRelease(&walsnd->mutex);
3120}

References Assert, fb(), MyWalSnd, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3726 of file walsender.c.

3727{
3728 got_SIGUSR2 = true;
3730}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2895 of file walsender.c.

2896{
2898
2899 /*
2900 * Initialize the last reply timestamp. That enables timeout processing
2901 * from hereon.
2902 */
2905
2906 /*
2907 * Loop until we reach the end of this timeline or the client requests to
2908 * stop streaming.
2909 */
2910 for (;;)
2911 {
2912 /* Clear any already-pending wakeups */
2914
2916
2917 /* Process any requests or signals received recently */
2919
2920 /* Check for input from the client */
2922
2923 /*
2924 * If we have received CopyDone from the client, sent CopyDone
2925 * ourselves, and the output buffer is empty, it's time to exit
2926 * streaming.
2927 */
2930 break;
2931
2932 /*
2933 * If we don't have any pending data in the output buffer, try to send
2934 * some more. If there is some, we don't bother to call send_data
2935 * again until we've flushed it ... but we'd better assume we are not
2936 * caught up.
2937 */
2938 if (!pq_is_send_pending())
2939 send_data();
2940 else
2941 WalSndCaughtUp = false;
2942
2943 /* Try to flush pending output to the client */
2944 if (pq_flush_if_writable() != 0)
2946
2947 /* If nothing remains to be sent right now ... */
2949 {
2950 /*
2951 * If we're in catchup state, move to streaming. This is an
2952 * important state change for users to know about, since before
2953 * this point data loss might occur if the primary dies and we
2954 * need to failover to the standby. The state change is also
2955 * important for synchronous replication, since commits that
2956 * started to wait at that point might wait for some time.
2957 */
2959 {
2961 (errmsg_internal("\"%s\" has now caught up with upstream server",
2964 }
2965
2966 /*
2967 * When SIGUSR2 arrives, we send any outstanding logs up to the
2968 * shutdown checkpoint record (i.e., the latest record), wait for
2969 * them to be replicated to the standby, and exit. This may be a
2970 * normal termination at shutdown, or a promotion, the walsender
2971 * is not sure which.
2972 */
2973 if (got_SIGUSR2)
2975 }
2976
2977 /* Check for replication timeout. */
2979
2980 /* Send keepalive if the time has come */
2982
2983 /*
2984 * Block if we have unsent data. XXX For logical replication, let
2985 * WalSndWaitForWal() handle any other blocking; idle receivers need
2986 * its additional actions. For physical replication, also block if
2987 * caught up; its send_data does not block.
2988 *
2989 * The IO statistics are reported in WalSndWaitForWal() for the
2990 * logical WAL senders.
2991 */
2995 {
2996 long sleeptime;
2997 int wakeEvents;
2999
3002 else
3003 wakeEvents = 0;
3004
3005 /*
3006 * Use fresh timestamp, not last_processing, to reduce the chance
3007 * of reaching wal_sender_timeout before sending a keepalive.
3008 */
3011
3012 if (pq_is_send_pending())
3014
3015 /* Report IO statistics, if needed */
3018 {
3019 pgstat_flush_io(false);
3021 last_flush = now;
3022 }
3023
3024 /* Sleep until something happens or we time out */
3026 }
3027 }
3028}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
char * application_name
Definition guc_tables.c:570
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:103
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3605

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1550 of file walsender.c.

1551{
1552 /* can't have sync rep confused by sending the same LSN several times */
1553 if (!last_write)
1554 lsn = InvalidXLogRecPtr;
1555
1556 resetStringInfo(ctx->out);
1557
1559 pq_sendint64(ctx->out, lsn); /* dataStart */
1560 pq_sendint64(ctx->out, lsn); /* walEnd */
1561
1562 /*
1563 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1564 * reserve space here.
1565 */
1566 pq_sendint64(ctx->out, 0); /* sendtime */
1567}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3681 of file walsender.c.

3682{
3683 int i;
3684
3685 for (i = 0; i < max_wal_senders; i++)
3686 {
3688
3689 SpinLockAcquire(&walsnd->mutex);
3690 if (walsnd->pid == 0)
3691 {
3692 SpinLockRelease(&walsnd->mutex);
3693 continue;
3694 }
3695 walsnd->needreload = true;
3696 SpinLockRelease(&walsnd->mutex);
3697 }
3698}

References fb(), i, max_wal_senders, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3124 of file walsender.c.

3126{
3127 char path[MAXPGPATH];
3128
3129 /*-------
3130 * When reading from a historic timeline, and there is a timeline switch
3131 * within this segment, read from the WAL segment belonging to the new
3132 * timeline.
3133 *
3134 * For example, imagine that this server is currently on timeline 5, and
3135 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3136 * 0/13002088. In pg_wal, we have these files:
3137 *
3138 * ...
3139 * 000000040000000000000012
3140 * 000000040000000000000013
3141 * 000000050000000000000013
3142 * 000000050000000000000014
3143 * ...
3144 *
3145 * In this situation, when requested to send the WAL from segment 0x13, on
3146 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3147 * recovery prefers files from newer timelines, so if the segment was
3148 * restored from the archive on this server, the file belonging to the old
3149 * timeline, 000000040000000000000013, might not exist. Their contents are
3150 * equal up to the switchpoint, because at a timeline switch, the used
3151 * portion of the old segment is copied to the new file.
3152 */
3155 {
3157
3158 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3159 if (nextSegNo == endSegNo)
3161 }
3162
3163 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3164 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3165 if (state->seg.ws_file >= 0)
3166 return;
3167
3168 /*
3169 * If the file is not found, assume it's because the standby asked for a
3170 * too old WAL segment that has already been removed or recycled.
3171 */
3172 if (errno == ENOENT)
3173 {
3174 char xlogfname[MAXFNAMELEN];
3175 int save_errno = errno;
3176
3178 errno = save_errno;
3179 ereport(ERROR,
3181 errmsg("requested WAL segment %s has already been removed",
3182 xlogfname)));
3183 }
3184 else
3185 ereport(ERROR,
3187 errmsg("could not open file \"%s\": %m",
3188 path)));
3189}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1089
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3953 of file walsender.c.

3954{
3956
3958
3959 if (walsnd->state == state)
3960 return;
3961
3962 SpinLockAcquire(&walsnd->mutex);
3963 walsnd->state = state;
3964 SpinLockRelease(&walsnd->mutex);
3965}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire, and SpinLockRelease.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3765 of file walsender.c.

3766{
3767 bool found;
3768 int i;
3769
3771 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3772
3773 if (!found)
3774 {
3775 /* First time through, so initialize */
3777
3778 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3780
3781 for (i = 0; i < max_wal_senders; i++)
3782 {
3784
3785 SpinLockInit(&walsnd->mutex);
3786 }
3787
3791 }
3792}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
#define SpinLockInit(lock)
Definition spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition walsender.c:3753

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, MemSet, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3753 of file walsender.c.

3754{
3755 Size size = 0;
3756
3757 size = offsetof(WalSndCtlData, walsnds);
3758 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3759
3760 return size;
3761}
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497

References add_size(), fb(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 384 of file walsender.c.

385{
386 /*
387 * Reset whereToSendOutput to prevent ereport from attempting to send any
388 * more messages to the standby.
389 */
392
393 proc_exit(0);
394}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:92

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3734 of file walsender.c.

3735{
3736 /* Set up signal handlers */
3738 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3739 pqsignal(SIGTERM, die); /* request shutdown */
3740 /* SIGQUIT handler was already set up by InitPostmasterChild */
3741 InitializeTimeouts(); /* establishes SIGALRM handler */
3744 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3745 * shutdown */
3746
3747 /* Reset some signals that are accepted by postmaster but not here */
3749}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:547
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3041
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:677
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3726
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1694 of file walsender.c.

1696{
1697 static TimestampTz sendTime = 0;
1699 bool pending_writes = false;
1700 bool end_xact = ctx->end_xact;
1701
1702 /*
1703 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1704 * avoid flooding the lag tracker when we commit frequently.
1705 *
1706 * We don't have a mechanism to get the ack for any LSN other than end
1707 * xact LSN from the downstream. So, we track lag only for end of
1708 * transaction LSN.
1709 */
1710#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1711 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1713 {
1714 LagTrackerWrite(lsn, now);
1715 sendTime = now;
1716 }
1717
1718 /*
1719 * When skipping empty transactions in synchronous replication, we send a
1720 * keepalive message to avoid delaying such transactions.
1721 *
1722 * It is okay to check sync_standbys_status without lock here as in the
1723 * worst case we will just send an extra keepalive message when it is
1724 * really not required.
1725 */
1726 if (skipped_xact &&
1727 SyncRepRequested() &&
1728 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1729 {
1730 WalSndKeepalive(false, lsn);
1731
1732 /* Try to flush pending output to the client */
1733 if (pq_flush_if_writable() != 0)
1735
1736 /* If we have pending write here, make sure it's actually flushed */
1737 if (pq_is_send_pending())
1738 pending_writes = true;
1739 }
1740
1741 /*
1742 * Process pending writes if any or try to send a keepalive if required.
1743 * We don't need to try sending keep alive messages at the transaction end
1744 * as that will be done at a later point in time. This is required only
1745 * for large transactions where we don't send any changes to the
1746 * downstream and the receiver can timeout due to that.
1747 */
1748 if (pending_writes || (!end_xact &&
1750 wal_sender_timeout / 2)))
1752}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1645
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4239
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3831 of file walsender.c.

3832{
3833 WaitEvent event;
3834
3836
3837 /*
3838 * We use a condition variable to efficiently wake up walsenders in
3839 * WalSndWakeup().
3840 *
3841 * Every walsender prepares to sleep on a shared memory CV. Note that it
3842 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3843 * waitlist), but does not actually wait on the CV (IOW, it never calls
3844 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3845 * waiting, because we also need to wait for socket events. The processes
3846 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3847 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3848 * walsenders come out of WaitEventSetWait().
3849 *
3850 * This approach is simple and efficient because, one doesn't have to loop
3851 * through all the walsenders slots, with a spinlock acquisition and
3852 * release for every iteration, just to wake up only the waiting
3853 * walsenders. It makes WalSndWakeup() callers' life easy.
3854 *
3855 * XXX: A desirable future improvement would be to add support for CVs
3856 * into WaitEventSetWait().
3857 *
3858 * And, we use separate shared memory CVs for physical and logical
3859 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3860 *
3861 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3862 * until awakened by physical walsenders after the walreceiver confirms
3863 * the receipt of the LSN.
3864 */
3871
3872 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3873 (event.events & WL_POSTMASTER_DEATH))
3874 {
3876 proc_exit(1);
3877 }
3878
3880}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:63
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:166
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1844 of file walsender.c.

1845{
1846 int wakeEvents;
1847 uint32 wait_event = 0;
1850
1851 /*
1852 * Fast path to avoid acquiring the spinlock in case we already know we
1853 * have enough WAL available and all the standby servers have confirmed
1854 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1855 * if we're far behind.
1856 */
1859 return RecentFlushPtr;
1860
1861 /*
1862 * Within the loop, we wait for the necessary WALs to be flushed to disk
1863 * first, followed by waiting for standbys to catch up if there are enough
1864 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1865 */
1866 for (;;)
1867 {
1868 bool wait_for_standby_at_stop = false;
1869 long sleeptime;
1871
1872 /* Clear any already-pending wakeups */
1874
1876
1877 /* Process any requests or signals received recently */
1879
1880 /* Check for input from the client */
1882
1883 /*
1884 * If we're shutting down, trigger pending WAL to be written out,
1885 * otherwise we'd possibly end up waiting for WAL that never gets
1886 * written, because walwriter has shut down already.
1887 */
1888 if (got_STOPPING)
1890
1891 /*
1892 * To avoid the scenario where standbys need to catch up to a newer
1893 * WAL location in each iteration, we update our idea of the currently
1894 * flushed position only if we are not waiting for standbys to catch
1895 * up.
1896 */
1898 {
1899 if (!RecoveryInProgress())
1901 else
1903 }
1904
1905 /*
1906 * If postmaster asked us to stop and the standby slots have caught up
1907 * to the flushed position, don't wait anymore.
1908 *
1909 * It's important to do this check after the recomputation of
1910 * RecentFlushPtr, so we can send all remaining data before shutting
1911 * down.
1912 */
1913 if (got_STOPPING)
1914 {
1917 else
1918 break;
1919 }
1920
1921 /*
1922 * We only send regular messages to the client for full decoded
1923 * transactions, but a synchronous replication and walsender shutdown
1924 * possibly are waiting for a later location. So, before sleeping, we
1925 * send a ping containing the flush location. If the receiver is
1926 * otherwise idle, this keepalive will trigger a reply. Processing the
1927 * reply will update these MyWalSnd locations.
1928 */
1929 if (MyWalSnd->flush < sentPtr &&
1930 MyWalSnd->write < sentPtr &&
1933
1934 /*
1935 * Exit the loop if already caught up and doesn't need to wait for
1936 * standby slots.
1937 */
1940 break;
1941
1942 /*
1943 * Waiting for new WAL or waiting for standbys to catch up. Since we
1944 * need to wait, we're now caught up.
1945 */
1946 WalSndCaughtUp = true;
1947
1948 /*
1949 * Try to flush any pending output to the client.
1950 */
1951 if (pq_flush_if_writable() != 0)
1953
1954 /*
1955 * If we have received CopyDone from the client, sent CopyDone
1956 * ourselves, and the output buffer is empty, it's time to exit
1957 * streaming, so fail the current WAL fetch request.
1958 */
1961 break;
1962
1963 /* die if timeout was reached */
1965
1966 /* Send keepalive if the time has come */
1968
1969 /*
1970 * Sleep until something happens or we time out. Also wait for the
1971 * socket becoming writable, if there's still pending output.
1972 * Otherwise we might sit on sendable output data while waiting for
1973 * new WAL to be generated. (But if we have nothing to send, we don't
1974 * want to wake on socket-writable.)
1975 */
1978
1980
1981 if (pq_is_send_pending())
1983
1984 Assert(wait_event != 0);
1985
1986 /* Report IO statistics, if needed */
1989 {
1990 pgstat_flush_io(false);
1992 last_flush = now;
1993 }
1994
1996 }
1997
1998 /* reactivate latch so WalSndLoop knows to continue */
2000 return RecentFlushPtr;
2001}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1816
bool XLogBackgroundFlush(void)
Definition xlog.c:2988

References Assert, CHECK_FOR_INTERRUPTS, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3915 of file walsender.c.

3916{
3917 for (;;)
3918 {
3919 int i;
3920 bool all_stopped = true;
3921
3922 for (i = 0; i < max_wal_senders; i++)
3923 {
3925
3926 SpinLockAcquire(&walsnd->mutex);
3927
3928 if (walsnd->pid == 0)
3929 {
3930 SpinLockRelease(&walsnd->mutex);
3931 continue;
3932 }
3933
3934 if (walsnd->state != WALSNDSTATE_STOPPING)
3935 {
3936 all_stopped = false;
3937 SpinLockRelease(&walsnd->mutex);
3938 break;
3939 }
3940 SpinLockRelease(&walsnd->mutex);
3941 }
3942
3943 /* safe to leave if confirmation is done for all WAL senders */
3944 if (all_stopped)
3945 return;
3946
3947 pg_usleep(10000L); /* wait for 10 msec */
3948 }
3949}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire, SpinLockRelease, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3810 of file walsender.c.

3811{
3812 /*
3813 * Wake up all the walsenders waiting on WAL being flushed or replayed
3814 * respectively. Note that waiting walsender would have prepared to sleep
3815 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3816 * before actually waiting.
3817 */
3818 if (physical)
3820
3821 if (logical)
3823}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1577 of file walsender.c.

1579{
1581
1582 /*
1583 * Fill the send timestamp last, so that it is taken as late as possible.
1584 * This is somewhat ugly, but the protocol is set as it's already used for
1585 * several releases by streaming physical replication.
1586 */
1590 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1591 tmpbuf.data, sizeof(int64));
1592
1593 /* output previously gathered data in a CopyData packet */
1595
1597
1598 /* Try to flush pending output to the client */
1599 if (pq_flush_if_writable() != 0)
1601
1602 /* Try taking fast path unless we get too close to walsender timeout. */
1604 wal_sender_timeout / 2) &&
1606 {
1607 return;
1608 }
1609
1610 /* If we have pending write here, go to slow path */
1612}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, fb(), GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3512 of file walsender.c.

3513{
3514 XLogRecord *record;
3515 char *errm;
3516
3517 /*
3518 * We'll use the current flush point to determine whether we've caught up.
3519 * This variable is static in order to cache it across calls. Caching is
3520 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3521 * spinlock.
3522 */
3524
3525 /*
3526 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3527 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3528 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3529 * didn't wait - i.e. when we're shutting down.
3530 */
3531 WalSndCaughtUp = false;
3532
3534
3535 /* xlog record was invalid */
3536 if (errm != NULL)
3537 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3538 errm);
3539
3540 if (record != NULL)
3541 {
3542 /*
3543 * Note the lack of any call to LagTrackerWrite() which is handled by
3544 * WalSndUpdateProgress which is called by output plugin through
3545 * logical decoding write api.
3546 */
3548
3550 }
3551
3552 /*
3553 * If first time through in this session, initialize flushPtr. Otherwise,
3554 * we only need to update flushPtr if EndRecPtr is past it.
3555 */
3558 {
3559 /*
3560 * For cascading logical WAL senders, we use the replay LSN instead of
3561 * the flush LSN, since logical decoding on a standby only processes
3562 * WAL that has been replayed. This distinction becomes particularly
3563 * important during shutdown, as new WAL is no longer replayed and the
3564 * last replayed LSN marks the furthest point up to which decoding can
3565 * proceed.
3566 */
3569 else
3571 }
3572
3573 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3575 WalSndCaughtUp = true;
3576
3577 /*
3578 * If we're caught up and have been requested to stop, have WalSndLoop()
3579 * terminate the connection in an orderly manner, after writing out all
3580 * the pending data.
3581 */
3583 got_SIGUSR2 = true;
3584
3585 /* Update shared memory status */
3586 {
3588
3589 SpinLockAcquire(&walsnd->mutex);
3590 walsnd->sentPtr = sentPtr;
3591 SpinLockRelease(&walsnd->mutex);
3592 }
3593}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:88
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3202 of file walsender.c.

3203{
3205 XLogRecPtr startptr;
3206 XLogRecPtr endptr;
3207 Size nbytes;
3208 XLogSegNo segno;
3210 Size rbytes;
3211
3212 /* If requested switch the WAL sender to the stopping state. */
3213 if (got_STOPPING)
3215
3217 {
3218 WalSndCaughtUp = true;
3219 return;
3220 }
3221
3222 /* Figure out how far we can safely send the WAL. */
3224 {
3225 /*
3226 * Streaming an old timeline that's in this server's history, but is
3227 * not the one we're currently inserting or replaying. It can be
3228 * streamed up to the point where we switched off that timeline.
3229 */
3231 }
3232 else if (am_cascading_walsender)
3233 {
3235
3236 /*
3237 * Streaming the latest timeline on a standby.
3238 *
3239 * Attempt to send all WAL that has already been replayed, so that we
3240 * know it's valid. If we're receiving WAL through streaming
3241 * replication, it's also OK to send any WAL that has been received
3242 * but not replayed.
3243 *
3244 * The timeline we're recovering from can change, or we can be
3245 * promoted. In either case, the current timeline becomes historic. We
3246 * need to detect that so that we don't try to stream past the point
3247 * where we switched to another timeline. We check for promotion or
3248 * timeline switch after calculating FlushPtr, to avoid a race
3249 * condition: if the timeline becomes historic just after we checked
3250 * that it was still current, it's still be OK to stream it up to the
3251 * FlushPtr that was calculated before it became historic.
3252 */
3253 bool becameHistoric = false;
3254
3256
3257 if (!RecoveryInProgress())
3258 {
3259 /* We have been promoted. */
3261 am_cascading_walsender = false;
3262 becameHistoric = true;
3263 }
3264 else
3265 {
3266 /*
3267 * Still a cascading standby. But is the timeline we're sending
3268 * still the one recovery is recovering from?
3269 */
3271 becameHistoric = true;
3272 }
3273
3274 if (becameHistoric)
3275 {
3276 /*
3277 * The timeline we were sending has become historic. Read the
3278 * timeline history file of the new timeline to see where exactly
3279 * we forked off from the timeline we were sending.
3280 */
3281 List *history;
3282
3285
3288
3290
3292 }
3293 }
3294 else
3295 {
3296 /*
3297 * Streaming the current timeline on a primary.
3298 *
3299 * Attempt to send all data that's already been written out and
3300 * fsync'd to disk. We cannot go further than what's been written out
3301 * given the current implementation of WALRead(). And in any case
3302 * it's unsafe to send WAL that is not securely down to disk on the
3303 * primary: if the primary subsequently crashes and restarts, standbys
3304 * must not have applied any WAL that got lost on the primary.
3305 */
3307 }
3308
3309 /*
3310 * Record the current system time as an approximation of the time at which
3311 * this WAL location was written for the purposes of lag tracking.
3312 *
3313 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3314 * is flushed and we could get that time as well as the LSN when we call
3315 * GetFlushRecPtr() above (and likewise for the cascading standby
3316 * equivalent), but rather than putting any new code into the hot WAL path
3317 * it seems good enough to capture the time here. We should reach this
3318 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3319 * may take some time, we read the WAL flush pointer and take the time
3320 * very close to together here so that we'll get a later position if it is
3321 * still moving.
3322 *
3323 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3324 * this gives us a cheap approximation for the WAL flush time for this
3325 * LSN.
3326 *
3327 * Note that the LSN is not necessarily the LSN for the data contained in
3328 * the present message; it's the end of the WAL, which might be further
3329 * ahead. All the lag tracking machinery cares about is finding out when
3330 * that arbitrary LSN is eventually reported as written, flushed and
3331 * applied, so that it can measure the elapsed time.
3332 */
3334
3335 /*
3336 * If this is a historic timeline and we've reached the point where we
3337 * forked to the next timeline, stop streaming.
3338 *
3339 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3340 * startup process will normally replay all WAL that has been received
3341 * from the primary, before promoting, but if the WAL streaming is
3342 * terminated at a WAL page boundary, the valid portion of the timeline
3343 * might end in the middle of a WAL record. We might've already sent the
3344 * first half of that partial WAL record to the cascading standby, so that
3345 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3346 * replay the partial WAL record either, so it can still follow our
3347 * timeline switch.
3348 */
3350 {
3351 /* close the current file. */
3352 if (xlogreader->seg.ws_file >= 0)
3354
3355 /* Send CopyDone */
3357 streamingDoneSending = true;
3358
3359 WalSndCaughtUp = true;
3360
3361 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3364 return;
3365 }
3366
3367 /* Do we have any work to do? */
3369 if (SendRqstPtr <= sentPtr)
3370 {
3371 WalSndCaughtUp = true;
3372 return;
3373 }
3374
3375 /*
3376 * Figure out how much to send in one message. If there's no more than
3377 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3378 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3379 *
3380 * The rounding is not only for performance reasons. Walreceiver relies on
3381 * the fact that we never split a WAL record across two messages. Since a
3382 * long WAL record is split at page boundary into continuation records,
3383 * page boundary is always a safe cut-off point. We also assume that
3384 * SendRqstPtr never points to the middle of a WAL record.
3385 */
3386 startptr = sentPtr;
3387 endptr = startptr;
3388 endptr += MAX_SEND_SIZE;
3389
3390 /* if we went beyond SendRqstPtr, back off */
3391 if (SendRqstPtr <= endptr)
3392 {
3393 endptr = SendRqstPtr;
3395 WalSndCaughtUp = false;
3396 else
3397 WalSndCaughtUp = true;
3398 }
3399 else
3400 {
3401 /* round down to page boundary. */
3402 endptr -= (endptr % XLOG_BLCKSZ);
3403 WalSndCaughtUp = false;
3404 }
3405
3406 nbytes = endptr - startptr;
3407 Assert(nbytes <= MAX_SEND_SIZE);
3408
3409 /*
3410 * OK to read and send the slice.
3411 */
3414
3415 pq_sendint64(&output_message, startptr); /* dataStart */
3416 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3417 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3418
3419 /*
3420 * Read the log directly into the output buffer to avoid extra memcpy
3421 * calls.
3422 */
3424
3425retry:
3426 /* attempt to read WAL from WAL buffers first */
3428 startptr, nbytes, xlogreader->seg.ws_tli);
3430 startptr += rbytes;
3431 nbytes -= rbytes;
3432
3433 /* now read the remaining WAL from WAL file */
3434 if (nbytes > 0 &&
3437 startptr,
3438 nbytes,
3439 xlogreader->seg.ws_tli, /* Pass the current TLI because
3440 * only WalSndSegmentOpen controls
3441 * whether new TLI is needed. */
3442 &errinfo))
3444
3445 /* See logical_read_xlog_page(). */
3446 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3448
3449 /*
3450 * During recovery, the currently-open WAL file might be replaced with the
3451 * file of the same name retrieved from archive. So we always need to
3452 * check what we read was valid after reading into the buffer. If it's
3453 * invalid, we try to open and read the file again.
3454 */
3456 {
3458 bool reload;
3459
3460 SpinLockAcquire(&walsnd->mutex);
3461 reload = walsnd->needreload;
3462 walsnd->needreload = false;
3463 SpinLockRelease(&walsnd->mutex);
3464
3465 if (reload && xlogreader->seg.ws_file >= 0)
3466 {
3468
3469 goto retry;
3470 }
3471 }
3472
3473 output_message.len += nbytes;
3475
3476 /*
3477 * Fill the send timestamp last, so that it is taken as late as possible.
3478 */
3481 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3482 tmpbuf.data, sizeof(int64));
3483
3485
3486 sentPtr = endptr;
3487
3488 /* Update shared memory status */
3489 {
3491
3492 SpinLockAcquire(&walsnd->mutex);
3493 walsnd->sentPtr = sentPtr;
3494 SpinLockRelease(&walsnd->mutex);
3495 }
3496
3497 /* Report progress of XLOG streaming in PS display */
3499 {
3500 char activitymsg[50];
3501
3502 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3505 }
3506}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:114
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1754

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 252 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 216 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 199 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 155 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 156 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader