PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 223 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 111 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 100 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 241 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1391 of file walsender.c.

1392{
1393 bool failover_given = false;
1394 bool two_phase_given = false;
1395 bool failover;
1396 bool two_phase;
1397
1398 /* Parse options */
1399 foreach_ptr(DefElem, defel, cmd->options)
1400 {
1401 if (strcmp(defel->defname, "failover") == 0)
1402 {
1403 if (failover_given)
1404 ereport(ERROR,
1405 (errcode(ERRCODE_SYNTAX_ERROR),
1406 errmsg("conflicting or redundant options")));
1407 failover_given = true;
1408 failover = defGetBoolean(defel);
1409 }
1410 else if (strcmp(defel->defname, "two_phase") == 0)
1411 {
1412 if (two_phase_given)
1413 ereport(ERROR,
1414 (errcode(ERRCODE_SYNTAX_ERROR),
1415 errmsg("conflicting or redundant options")));
1416 two_phase_given = true;
1417 two_phase = defGetBoolean(defel);
1418 }
1419 else
1420 elog(ERROR, "unrecognized option: %s", defel->defname);
1421 }
1422
1424 failover_given ? &failover : NULL,
1425 two_phase_given ? &two_phase : NULL);
1426}
bool defGetBoolean(DefElem *def)
Definition: define.c:94
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:837

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1177 of file walsender.c.

1178{
1179 const char *snapshot_name = NULL;
1180 char xloc[MAXFNAMELEN];
1181 char *slot_name;
1182 bool reserve_wal = false;
1183 bool two_phase = false;
1184 bool failover = false;
1185 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1187 TupOutputState *tstate;
1188 TupleDesc tupdesc;
1189 Datum values[4];
1190 bool nulls[4] = {0};
1191
1193
1194 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1195 &failover);
1196
1197 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1198 {
1199 ReplicationSlotCreate(cmd->slotname, false,
1201 false, false, false);
1202
1203 if (reserve_wal)
1204 {
1206
1208
1209 /* Write this slot to disk if it's a permanent one. */
1210 if (!cmd->temporary)
1212 }
1213 }
1214 else
1215 {
1217 bool need_full_snapshot = false;
1218
1220
1222
1223 /*
1224 * Initially create persistent slot as ephemeral - that allows us to
1225 * nicely handle errors during initialization because it'll get
1226 * dropped if this transaction fails. We'll make it persistent at the
1227 * end. Temporary slots can be created as temporary from beginning as
1228 * they get dropped on error as well.
1229 */
1232 two_phase, failover, false);
1233
1234 /*
1235 * Do options check early so that we can bail before calling the
1236 * DecodingContextFindStartpoint which can take long time.
1237 */
1238 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1239 {
1240 if (IsTransactionBlock())
1241 ereport(ERROR,
1242 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1243 (errmsg("%s must not be called inside a transaction",
1244 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1245
1246 need_full_snapshot = true;
1247 }
1248 else if (snapshot_action == CRS_USE_SNAPSHOT)
1249 {
1250 if (!IsTransactionBlock())
1251 ereport(ERROR,
1252 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1253 (errmsg("%s must be called inside a transaction",
1254 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1255
1257 ereport(ERROR,
1258 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1259 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1260 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1261 if (!XactReadOnly)
1262 ereport(ERROR,
1263 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1264 (errmsg("%s must be called in a read-only transaction",
1265 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1266
1267 if (FirstSnapshotSet)
1268 ereport(ERROR,
1269 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1270 (errmsg("%s must be called before any query",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1272
1273 if (IsSubTransaction())
1274 ereport(ERROR,
1275 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1276 (errmsg("%s must not be called in a subtransaction",
1277 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1278
1279 need_full_snapshot = true;
1280 }
1281
1282 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1285 .segment_open = WalSndSegmentOpen,
1286 .segment_close = wal_segment_close),
1289
1290 /*
1291 * Signal that we don't need the timeout mechanism. We're just
1292 * creating the replication slot and don't yet accept feedback
1293 * messages or send keepalives. As we possibly need to wait for
1294 * further WAL the walsender would otherwise possibly be killed too
1295 * soon.
1296 */
1298
1299 /* build initial snapshot, might take a while */
1301
1302 /*
1303 * Export or use the snapshot if we've been asked to do so.
1304 *
1305 * NB. We will convert the snapbuild.c kind of snapshot to normal
1306 * snapshot when doing this.
1307 */
1308 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1309 {
1310 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1311 }
1312 else if (snapshot_action == CRS_USE_SNAPSHOT)
1313 {
1314 Snapshot snap;
1315
1318 }
1319
1320 /* don't need the decoding context anymore */
1322
1323 if (!cmd->temporary)
1325 }
1326
1327 snprintf(xloc, sizeof(xloc), "%X/%X",
1329
1331
1332 /*----------
1333 * Need a tuple descriptor representing four columns:
1334 * - first field: the slot name
1335 * - second field: LSN at which we became consistent
1336 * - third field: exported snapshot's name
1337 * - fourth field: output plugin
1338 */
1339 tupdesc = CreateTemplateTupleDesc(4);
1340 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1341 TEXTOID, -1, 0);
1342 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1343 TEXTOID, -1, 0);
1344 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1345 TEXTOID, -1, 0);
1346 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1347 TEXTOID, -1, 0);
1348
1349 /* prepare for projection of tuples */
1350 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1351
1352 /* slot_name */
1353 slot_name = NameStr(MyReplicationSlot->data.name);
1354 values[0] = CStringGetTextDatum(slot_name);
1355
1356 /* consistent wal location */
1357 values[1] = CStringGetTextDatum(xloc);
1358
1359 /* snapshot name, or NULL if none */
1360 if (snapshot_name != NULL)
1361 values[2] = CStringGetTextDatum(snapshot_name);
1362 else
1363 nulls[2] = true;
1364
1365 /* plugin, or NULL if none */
1366 if (cmd->plugin != NULL)
1368 else
1369 nulls[3] = true;
1370
1371 /* send it to dest */
1372 do_tup_output(tstate, values, nulls);
1373 end_tup_output(tstate);
1374
1376}
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:717
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2464
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
Assert(PointerIsAligned(start, uint64))
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:675
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:631
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:109
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:239
uintptr_t Datum
Definition: postgres.h:69
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:324
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1061
void ReplicationSlotReserveWal(void)
Definition: slot.c:1452
void ReplicationSlotPersist(void)
Definition: slot.c:1078
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
void ReplicationSlotSave(void)
Definition: slot.c:1043
void ReplicationSlotRelease(void)
Definition: slot.c:686
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
bool FirstSnapshotSet
Definition: snapmgr.c:192
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1843
PGPROC * MyProc
Definition: proc.c:67
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:111
ReplicationSlotPersistentData data
Definition: slot.h:185
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:175
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:911
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1100
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2993
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1553
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1649
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1027
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1526
static TimestampTz last_reply_timestamp
Definition: walsender.c:184
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:82
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:5044
bool IsTransactionBlock(void)
Definition: xact.c:4971
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, failover, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1382 of file walsender.c.

1383{
1384 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1385}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:814

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1970 of file walsender.c.

1971{
1972 yyscan_t scanner;
1973 int parse_rc;
1974 Node *cmd_node;
1975 const char *cmdtag;
1976 MemoryContext cmd_context;
1977 MemoryContext old_context;
1978
1979 /*
1980 * If WAL sender has been told that shutdown is getting close, switch its
1981 * status accordingly to handle the next replication commands correctly.
1982 */
1983 if (got_STOPPING)
1985
1986 /*
1987 * Throw error if in stopping mode. We need prevent commands that could
1988 * generate WAL while the shutdown checkpoint is being written. To be
1989 * safe, we just prohibit all new commands.
1990 */
1992 ereport(ERROR,
1993 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1994 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1995
1996 /*
1997 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1998 * command arrives. Clean up the old stuff if there's anything.
1999 */
2001
2003
2004 /*
2005 * Prepare to parse and execute the command.
2006 */
2008 "Replication command context",
2010 old_context = MemoryContextSwitchTo(cmd_context);
2011
2012 replication_scanner_init(cmd_string, &scanner);
2013
2014 /*
2015 * Is it a WalSender command?
2016 */
2018 {
2019 /* Nope; clean up and get out. */
2021
2022 MemoryContextSwitchTo(old_context);
2023 MemoryContextDelete(cmd_context);
2024
2025 /* XXX this is a pretty random place to make this check */
2026 if (MyDatabaseId == InvalidOid)
2027 ereport(ERROR,
2028 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2029 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2030
2031 /* Tell the caller that this wasn't a WalSender command. */
2032 return false;
2033 }
2034
2035 /*
2036 * Looks like a WalSender command, so parse it.
2037 */
2038 parse_rc = replication_yyparse(&cmd_node, scanner);
2039 if (parse_rc != 0)
2040 ereport(ERROR,
2041 (errcode(ERRCODE_SYNTAX_ERROR),
2042 errmsg_internal("replication command parser returned %d",
2043 parse_rc)));
2045
2046 /*
2047 * Report query to various monitoring facilities. For this purpose, we
2048 * report replication commands just like SQL commands.
2049 */
2050 debug_query_string = cmd_string;
2051
2053
2054 /*
2055 * Log replication command if log_replication_commands is enabled. Even
2056 * when it's disabled, log the command with DEBUG1 level for backward
2057 * compatibility.
2058 */
2060 (errmsg("received replication command: %s", cmd_string)));
2061
2062 /*
2063 * Disallow replication commands in aborted transaction blocks.
2064 */
2066 ereport(ERROR,
2067 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2068 errmsg("current transaction is aborted, "
2069 "commands ignored until end of transaction block")));
2070
2072
2073 /*
2074 * Allocate buffers that will be used for each outgoing and incoming
2075 * message. We do this just once per command to reduce palloc overhead.
2076 */
2080
2081 switch (cmd_node->type)
2082 {
2083 case T_IdentifySystemCmd:
2084 cmdtag = "IDENTIFY_SYSTEM";
2085 set_ps_display(cmdtag);
2087 EndReplicationCommand(cmdtag);
2088 break;
2089
2090 case T_ReadReplicationSlotCmd:
2091 cmdtag = "READ_REPLICATION_SLOT";
2092 set_ps_display(cmdtag);
2094 EndReplicationCommand(cmdtag);
2095 break;
2096
2097 case T_BaseBackupCmd:
2098 cmdtag = "BASE_BACKUP";
2099 set_ps_display(cmdtag);
2100 PreventInTransactionBlock(true, cmdtag);
2102 EndReplicationCommand(cmdtag);
2103 break;
2104
2105 case T_CreateReplicationSlotCmd:
2106 cmdtag = "CREATE_REPLICATION_SLOT";
2107 set_ps_display(cmdtag);
2109 EndReplicationCommand(cmdtag);
2110 break;
2111
2112 case T_DropReplicationSlotCmd:
2113 cmdtag = "DROP_REPLICATION_SLOT";
2114 set_ps_display(cmdtag);
2116 EndReplicationCommand(cmdtag);
2117 break;
2118
2119 case T_AlterReplicationSlotCmd:
2120 cmdtag = "ALTER_REPLICATION_SLOT";
2121 set_ps_display(cmdtag);
2123 EndReplicationCommand(cmdtag);
2124 break;
2125
2126 case T_StartReplicationCmd:
2127 {
2128 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2129
2130 cmdtag = "START_REPLICATION";
2131 set_ps_display(cmdtag);
2132 PreventInTransactionBlock(true, cmdtag);
2133
2134 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2135 StartReplication(cmd);
2136 else
2138
2139 /* dupe, but necessary per libpqrcv_endstreaming */
2140 EndReplicationCommand(cmdtag);
2141
2142 Assert(xlogreader != NULL);
2143 break;
2144 }
2145
2146 case T_TimeLineHistoryCmd:
2147 cmdtag = "TIMELINE_HISTORY";
2148 set_ps_display(cmdtag);
2149 PreventInTransactionBlock(true, cmdtag);
2151 EndReplicationCommand(cmdtag);
2152 break;
2153
2154 case T_VariableShowStmt:
2155 {
2157 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2158
2159 cmdtag = "SHOW";
2160 set_ps_display(cmdtag);
2161
2162 /* syscache access needs a transaction environment */
2164 GetPGVariable(n->name, dest);
2166 EndReplicationCommand(cmdtag);
2167 }
2168 break;
2169
2170 case T_UploadManifestCmd:
2171 cmdtag = "UPLOAD_MANIFEST";
2172 set_ps_display(cmdtag);
2173 PreventInTransactionBlock(true, cmdtag);
2175 EndReplicationCommand(cmdtag);
2176 break;
2177
2178 default:
2179 elog(ERROR, "unrecognized replication command node tag: %u",
2180 cmd_node->type);
2181 }
2182
2183 /* done */
2184 MemoryContextSwitchTo(old_context);
2185 MemoryContextDelete(cmd_context);
2186
2187 /*
2188 * We need not update ps display or pg_stat_activity, because PostgresMain
2189 * will reset those to "idle". But we must reset debug_query_string to
2190 * ensure it doesn't become a dangling pointer.
2191 */
2192 debug_query_string = NULL;
2193
2194 return true;
2195}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:67
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:95
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:35
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1391
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:563
WalSnd * MyWalSnd
Definition: walsender.c:117
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:464
static StringInfoData tmpbuf
Definition: walsender.c:175
static void IdentifySystem(void)
Definition: walsender.c:383
static StringInfoData reply_message
Definition: walsender.c:174
void WalSndSetState(WalSndState state)
Definition: walsender.c:3814
static StringInfoData output_message
Definition: walsender.c:173
static void UploadManifest(void)
Definition: walsender.c:653
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:203
bool log_replication_commands
Definition: walsender.c:130
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1177
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1433
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:152
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1382
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:795
static XLogReaderState * xlogreader
Definition: walsender.c:142
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3648
void StartTransactionCommand(void)
Definition: xact.c:3059
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:407
void CommitTransactionCommand(void)
Definition: xact.c:3157

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3509 of file walsender.c.

3510{
3511 XLogRecPtr replayPtr;
3512 TimeLineID replayTLI;
3513 XLogRecPtr receivePtr;
3515 XLogRecPtr result;
3516
3518
3519 /*
3520 * We can safely send what's already been replayed. Also, if walreceiver
3521 * is streaming WAL from the same timeline, we can send anything that it
3522 * has streamed, but hasn't been replayed yet.
3523 */
3524
3525 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3526 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3527
3528 if (tli)
3529 *tli = replayTLI;
3530
3531 result = replayPtr;
3532 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3533 result = receivePtr;
3534
3535 return result;
3536}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1653
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:121
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 719 of file walsender.c.

721{
722 int mtype;
723 int maxmsglen;
724
726
728 mtype = pq_getbyte();
729 if (mtype == EOF)
731 (errcode(ERRCODE_CONNECTION_FAILURE),
732 errmsg("unexpected EOF on client connection with an open transaction")));
733
734 switch (mtype)
735 {
736 case 'd': /* CopyData */
737 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
738 break;
739 case 'c': /* CopyDone */
740 case 'f': /* CopyFail */
741 case 'H': /* Flush */
742 case 'S': /* Sync */
743 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
744 break;
745 default:
747 (errcode(ERRCODE_PROTOCOL_VIOLATION),
748 errmsg("unexpected message type 0x%02X during COPY from stdin",
749 mtype)));
750 maxmsglen = 0; /* keep compiler quiet */
751 break;
752 }
753
754 /* Now collect the message body */
755 if (pq_getmessage(buf, maxmsglen))
757 (errcode(ERRCODE_CONNECTION_FAILURE),
758 errmsg("unexpected EOF on client connection with an open transaction")));
760
761 /* Process the message */
762 switch (mtype)
763 {
764 case 'd': /* CopyData */
765 AppendIncrementalManifestData(ib, buf->data, buf->len);
766 return true;
767
768 case 'c': /* CopyDone */
769 return false;
770
771 case 'H': /* Sync */
772 case 'S': /* Flush */
773 /* Ignore these while in CopyOut mode as we do elsewhere. */
774 return true;
775
776 case 'f':
778 (errcode(ERRCODE_QUERY_CANCELED),
779 errmsg("COPY from stdin failed: %s",
781 }
782
783 /* Not reached. */
784 Assert(false);
785 return false;
786}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:144
static char * buf
Definition: pg_test_fsync.c:72
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1204
int pq_getbyte(void)
Definition: pqcomm.c:964
void pq_startmsgread(void)
Definition: pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References AppendIncrementalManifestData(), Assert(), buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3565 of file walsender.c.

3566{
3568
3569 /*
3570 * If replication has not yet started, die like with SIGTERM. If
3571 * replication is active, only set a flag and wake up the main loop. It
3572 * will send any outstanding WAL, wait for it to be replicated to the
3573 * standby, and then exit gracefully.
3574 */
3575 if (!replication_active)
3576 kill(MyProcPid, SIGTERM);
3577 else
3578 got_STOPPING = true;
3579}
int MyProcPid
Definition: globals.c:48
bool am_walsender
Definition: walsender.c:120
static volatile sig_atomic_t replication_active
Definition: walsender.c:211
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 383 of file walsender.c.

384{
385 char sysid[32];
386 char xloc[MAXFNAMELEN];
387 XLogRecPtr logptr;
388 char *dbname = NULL;
390 TupOutputState *tstate;
391 TupleDesc tupdesc;
392 Datum values[4];
393 bool nulls[4] = {0};
394 TimeLineID currTLI;
395
396 /*
397 * Reply with a result set with one row, four columns. First col is system
398 * ID, second is timeline ID, third is current xlog location and the
399 * fourth contains the database name if we are connected to one.
400 */
401
402 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
404
407 logptr = GetStandbyFlushRecPtr(&currTLI);
408 else
409 logptr = GetFlushRecPtr(&currTLI);
410
411 snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
412
414 {
416
417 /* syscache access needs a transaction env. */
420 /* copy dbname out of TX context */
423 }
424
426
427 /* need a tuple descriptor representing four columns */
428 tupdesc = CreateTemplateTupleDesc(4);
429 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
430 TEXTOID, -1, 0);
431 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
432 INT8OID, -1, 0);
433 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
434 TEXTOID, -1, 0);
435 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
436 TEXTOID, -1, 0);
437
438 /* prepare for projection of tuples */
439 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
440
441 /* column 1: system identifier */
442 values[0] = CStringGetTextDatum(sysid);
443
444 /* column 2: timeline */
445 values[1] = Int64GetDatum(currTLI);
446
447 /* column 3: wal location */
448 values[2] = CStringGetTextDatum(xloc);
449
450 /* column 4: database name, or NULL if none */
451 if (dbname)
453 else
454 nulls[3] = true;
455
456 /* send it to dest */
457 do_tup_output(tstate, values, nulls);
458
459 end_tup_output(tstate);
460}
#define UINT64_FORMAT
Definition: c.h:521
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3188
struct cursor * cur
Definition: ecpg.c:29
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:2308
char * dbname
Definition: streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3509
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4734
bool RecoveryInProgress(void)
Definition: xlog.c:6522
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6687

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 283 of file walsender.c.

284{
286
287 /* Create a per-walsender data structure in shared memory */
289
290 /* need resource owner for e.g. basebackups */
292
293 /*
294 * Let postmaster know that we're a WAL sender. Once we've declared us as
295 * a WAL sender process, postmaster will let us outlive the bgwriter and
296 * kill us last in the shutdown sequence, so we get a chance to stream all
297 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298 * there's no going back, and we mustn't write any WAL records after this.
299 */
302
303 /*
304 * If the client didn't specify a database to connect to, show in PGPROC
305 * that our advertised xmin should affect vacuum horizons in all
306 * databases. This allows physical replication clients to send hot
307 * standby feedback that will delay vacuum cleanup in all databases.
308 */
310 {
312 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315 LWLockRelease(ProcArrayLock);
316 }
317
318 /* Initialize empty timestamp buffer for lag tracking. */
320}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
MemoryContext TopMemoryContext
Definition: mcxt.c:165
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:999
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:178
uint8 statusFlags
Definition: proc.h:243
int pgxactoff
Definition: proc.h:185
uint8 * statusFlags
Definition: proc.h:387
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:2901
static LagTracker * lag_tracker
Definition: walsender.c:235

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2901 of file walsender.c.

2902{
2903 int i;
2904
2905 /*
2906 * WalSndCtl should be set up already (we inherit this by fork() or
2907 * EXEC_BACKEND mechanism from the postmaster).
2908 */
2909 Assert(WalSndCtl != NULL);
2910 Assert(MyWalSnd == NULL);
2911
2912 /*
2913 * Find a free walsender slot and reserve it. This must not fail due to
2914 * the prior check for free WAL senders in InitProcess().
2915 */
2916 for (i = 0; i < max_wal_senders; i++)
2917 {
2918 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2919
2920 SpinLockAcquire(&walsnd->mutex);
2921
2922 if (walsnd->pid != 0)
2923 {
2924 SpinLockRelease(&walsnd->mutex);
2925 continue;
2926 }
2927 else
2928 {
2929 /*
2930 * Found a free slot. Reserve it for us.
2931 */
2932 walsnd->pid = MyProcPid;
2933 walsnd->state = WALSNDSTATE_STARTUP;
2934 walsnd->sentPtr = InvalidXLogRecPtr;
2935 walsnd->needreload = false;
2936 walsnd->write = InvalidXLogRecPtr;
2937 walsnd->flush = InvalidXLogRecPtr;
2938 walsnd->apply = InvalidXLogRecPtr;
2939 walsnd->writeLag = -1;
2940 walsnd->flushLag = -1;
2941 walsnd->applyLag = -1;
2942 walsnd->sync_standby_priority = 0;
2943 walsnd->replyTime = 0;
2944
2945 /*
2946 * The kind assignment is done here and not in StartReplication()
2947 * and StartLogicalReplication(). Indeed, the logical walsender
2948 * needs to read WAL records (like snapshot of running
2949 * transactions) during the slot creation. So it needs to be woken
2950 * up based on its kind.
2951 *
2952 * The kind assignment could also be done in StartReplication(),
2953 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2954 * seems better to set it on one place.
2955 */
2956 if (MyDatabaseId == InvalidOid)
2958 else
2960
2961 SpinLockRelease(&walsnd->mutex);
2962 /* don't need the lock anymore */
2963 MyWalSnd = (WalSnd *) walsnd;
2964
2965 break;
2966 }
2967 }
2968
2969 Assert(MyWalSnd != NULL);
2970
2971 /* Arrange to clean up at walsender exit */
2973}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:126
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2977
WalSndCtlData * WalSndCtl
Definition: walsender.c:114
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4165 of file walsender.c.

4166{
4167 TimestampTz time = 0;
4168
4169 /* Read all unread samples up to this LSN or end of buffer. */
4170 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4172 {
4174 lag_tracker->last_read[head] =
4176 lag_tracker->read_heads[head] =
4178 }
4179
4180 /*
4181 * If the lag tracker is empty, that means the standby has processed
4182 * everything we've ever sent so we should now clear 'last_read'. If we
4183 * didn't do that, we'd risk using a stale and irrelevant sample for
4184 * interpolation at the beginning of the next burst of WAL after a period
4185 * of idleness.
4186 */
4188 lag_tracker->last_read[head].time = 0;
4189
4190 if (time > now)
4191 {
4192 /* If the clock somehow went backwards, treat as not found. */
4193 return -1;
4194 }
4195 else if (time == 0)
4196 {
4197 /*
4198 * We didn't cross a time. If there is a future sample that we
4199 * haven't reached yet, and we've already reached at least one sample,
4200 * let's interpolate the local flushed time. This is mainly useful
4201 * for reporting a completely stuck apply position as having
4202 * increasing lag, since otherwise we'd have to wait for it to
4203 * eventually start moving again and cross one of our samples before
4204 * we can show the lag increasing.
4205 */
4207 {
4208 /* There are no future samples, so we can't interpolate. */
4209 return -1;
4210 }
4211 else if (lag_tracker->last_read[head].time != 0)
4212 {
4213 /* We can interpolate between last_read and the next sample. */
4214 double fraction;
4215 WalTimeSample prev = lag_tracker->last_read[head];
4217
4218 if (lsn < prev.lsn)
4219 {
4220 /*
4221 * Reported LSNs shouldn't normally go backwards, but it's
4222 * possible when there is a timeline change. Treat as not
4223 * found.
4224 */
4225 return -1;
4226 }
4227
4228 Assert(prev.lsn < next.lsn);
4229
4230 if (prev.time > next.time)
4231 {
4232 /* If the clock somehow went backwards, treat as not found. */
4233 return -1;
4234 }
4235
4236 /* See how far we are between the previous and next samples. */
4237 fraction =
4238 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4239
4240 /* Scale the local flush time proportionally. */
4241 time = (TimestampTz)
4242 ((double) prev.time + (next.time - prev.time) * fraction);
4243 }
4244 else
4245 {
4246 /*
4247 * We have only a future sample, implying that we were entirely
4248 * caught up but and now there is a new burst of WAL and the
4249 * standby hasn't processed the first sample yet. Until the
4250 * standby reaches the future sample the best we can do is report
4251 * the hypothetical lag if that sample were to be replayed now.
4252 */
4254 }
4255 }
4256
4257 /* Return the elapsed time since local flush time in microseconds. */
4258 Assert(time != 0);
4259 return now - time;
4260}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
static int32 next
Definition: blutils.c:224
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:229
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:231
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:232
int write_head
Definition: walsender.c:230
TimestampTz time
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:218
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:223

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4100 of file walsender.c.

4101{
4102 bool buffer_full;
4103 int new_write_head;
4104 int i;
4105
4106 if (!am_walsender)
4107 return;
4108
4109 /*
4110 * If the lsn hasn't advanced since last time, then do nothing. This way
4111 * we only record a new sample when new WAL has been written.
4112 */
4113 if (lag_tracker->last_lsn == lsn)
4114 return;
4115 lag_tracker->last_lsn = lsn;
4116
4117 /*
4118 * If advancing the write head of the circular buffer would crash into any
4119 * of the read heads, then the buffer is full. In other words, the
4120 * slowest reader (presumably apply) is the one that controls the release
4121 * of space.
4122 */
4123 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4124 buffer_full = false;
4125 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4126 {
4127 if (new_write_head == lag_tracker->read_heads[i])
4128 buffer_full = true;
4129 }
4130
4131 /*
4132 * If the buffer is full, for now we just rewind by one slot and overwrite
4133 * the last sample, as a simple (if somewhat uneven) way to lower the
4134 * sampling rate. There may be better adaptive compaction algorithms.
4135 */
4136 if (buffer_full)
4137 {
4138 new_write_head = lag_tracker->write_head;
4139 if (lag_tracker->write_head > 0)
4141 else
4143 }
4144
4145 /* Store a sample at the current write head position. */
4147 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4148 lag_tracker->write_head = new_write_head;
4149}
XLogRecPtr last_lsn
Definition: walsender.c:228
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1027 of file walsender.c.

1029{
1030 XLogRecPtr flushptr;
1031 int count;
1032 WALReadError errinfo;
1033 XLogSegNo segno;
1034 TimeLineID currTLI;
1035
1036 /*
1037 * Make sure we have enough WAL available before retrieving the current
1038 * timeline.
1039 */
1040 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1041
1042 /* Fail if not enough (implies we are going to shut down) */
1043 if (flushptr < targetPagePtr + reqLen)
1044 return -1;
1045
1046 /*
1047 * Since logical decoding is also permitted on a standby server, we need
1048 * to check if the server is in recovery to decide how to get the current
1049 * timeline ID (so that it also covers the promotion or timeline change
1050 * cases). We must determine am_cascading_walsender after waiting for the
1051 * required WAL so that it is correct when the walsender wakes up after a
1052 * promotion.
1053 */
1055
1057 GetXLogReplayRecPtr(&currTLI);
1058 else
1059 currTLI = GetWALInsertionTimeLine();
1060
1061 XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1062 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1063 sendTimeLine = state->currTLI;
1064 sendTimeLineValidUpto = state->currTLIValidUntil;
1065 sendTimeLineNextTLI = state->nextTLI;
1066
1067 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1068 count = XLOG_BLCKSZ; /* more than one block available */
1069 else
1070 count = flushptr - targetPagePtr; /* part of the page available */
1071
1072 /* now actually read the data, we know it's there */
1073 if (!WALRead(state,
1074 cur_page,
1075 targetPagePtr,
1076 count,
1077 currTLI, /* Pass the current TLI because only
1078 * WalSndSegmentOpen controls whether new TLI
1079 * is needed. */
1080 &errinfo))
1081 WALReadRaiseError(&errinfo);
1082
1083 /*
1084 * After reading into the buffer, check that what we read was valid. We do
1085 * this after reading, because even though the segment was present when we
1086 * opened it, it might get recycled or removed while we read it. The
1087 * read() succeeds in that case, but the data we tried to read might
1088 * already have been overwritten with new WAL records.
1089 */
1090 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1091 CheckXLogRemoved(segno, state->seg.ws_tli);
1092
1093 return count;
1094}
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:161
static bool sendTimeLineIsHistoric
Definition: walsender.c:163
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1799
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:162
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:164
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6708
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3866
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1504
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1739 of file walsender.c.

1740{
1741 int elevel = got_STOPPING ? ERROR : WARNING;
1742 bool failover_slot;
1743
1744 failover_slot = (replication_active && MyReplicationSlot->data.failover);
1745
1746 /*
1747 * Note that after receiving the shutdown signal, an ERROR is reported if
1748 * any slots are dropped, invalidated, or inactive. This measure is taken
1749 * to prevent the walsender from waiting indefinitely.
1750 */
1751 if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1752 {
1753 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1754 return true;
1755 }
1756
1757 *wait_event = 0;
1758 return false;
1759}
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2805

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1771 of file walsender.c.

1773{
1774 /* Check if we need to wait for WALs to be flushed to disk */
1775 if (target_lsn > flushed_lsn)
1776 {
1777 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1778 return true;
1779 }
1780
1781 /* Check if the standby slots have caught up to the flushed position */
1782 return NeedToWaitForStandbys(flushed_lsn, wait_event);
1783}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1739

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3852 of file walsender.c.

3853{
3854 Interval *result = palloc(sizeof(Interval));
3855
3856 result->month = 0;
3857 result->day = 0;
3858 result->time = offset;
3859
3860 return result;
3861}
void * palloc(Size size)
Definition: mcxt.c:1939
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool *  reserve_wal,
CRSSnapshotAction snapshot_action,
bool *  two_phase,
bool *  failover 
)
static

Definition at line 1100 of file walsender.c.

1104{
1105 ListCell *lc;
1106 bool snapshot_action_given = false;
1107 bool reserve_wal_given = false;
1108 bool two_phase_given = false;
1109 bool failover_given = false;
1110
1111 /* Parse options */
1112 foreach(lc, cmd->options)
1113 {
1114 DefElem *defel = (DefElem *) lfirst(lc);
1115
1116 if (strcmp(defel->defname, "snapshot") == 0)
1117 {
1118 char *action;
1119
1120 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1121 ereport(ERROR,
1122 (errcode(ERRCODE_SYNTAX_ERROR),
1123 errmsg("conflicting or redundant options")));
1124
1125 action = defGetString(defel);
1126 snapshot_action_given = true;
1127
1128 if (strcmp(action, "export") == 0)
1129 *snapshot_action = CRS_EXPORT_SNAPSHOT;
1130 else if (strcmp(action, "nothing") == 0)
1131 *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1132 else if (strcmp(action, "use") == 0)
1133 *snapshot_action = CRS_USE_SNAPSHOT;
1134 else
1135 ereport(ERROR,
1136 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1137 errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1138 defel->defname, action)));
1139 }
1140 else if (strcmp(defel->defname, "reserve_wal") == 0)
1141 {
1142 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1143 ereport(ERROR,
1144 (errcode(ERRCODE_SYNTAX_ERROR),
1145 errmsg("conflicting or redundant options")));
1146
1147 reserve_wal_given = true;
1148 *reserve_wal = defGetBoolean(defel);
1149 }
1150 else if (strcmp(defel->defname, "two_phase") == 0)
1151 {
1152 if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1153 ereport(ERROR,
1154 (errcode(ERRCODE_SYNTAX_ERROR),
1155 errmsg("conflicting or redundant options")));
1156 two_phase_given = true;
1157 *two_phase = defGetBoolean(defel);
1158 }
1159 else if (strcmp(defel->defname, "failover") == 0)
1160 {
1161 if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1162 ereport(ERROR,
1163 (errcode(ERRCODE_SYNTAX_ERROR),
1164 errmsg("conflicting or redundant options")));
1165 failover_given = true;
1166 *failover = defGetBoolean(defel);
1167 }
1168 else
1169 elog(ERROR, "unrecognized option: %s", defel->defname);
1170 }
1171}
char * defGetString(DefElem *def)
Definition: define.c:35
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:826
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, failover, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3868 of file walsender.c.

3869{
3870#define PG_STAT_GET_WAL_SENDERS_COLS 12
3871 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3872 SyncRepStandbyData *sync_standbys;
3873 int num_standbys;
3874 int i;
3875
3876 InitMaterializedSRF(fcinfo, 0);
3877
3878 /*
3879 * Get the currently active synchronous standbys. This could be out of
3880 * date before we're done, but we'll use the data anyway.
3881 */
3882 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3883
3884 for (i = 0; i < max_wal_senders; i++)
3885 {
3886 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3887 XLogRecPtr sent_ptr;
3889 XLogRecPtr flush;
3890 XLogRecPtr apply;
3891 TimeOffset writeLag;
3892 TimeOffset flushLag;
3893 TimeOffset applyLag;
3894 int priority;
3895 int pid;
3897 TimestampTz replyTime;
3898 bool is_sync_standby;
3900 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3901 int j;
3902
3903 /* Collect data from shared memory */
3904 SpinLockAcquire(&walsnd->mutex);
3905 if (walsnd->pid == 0)
3906 {
3907 SpinLockRelease(&walsnd->mutex);
3908 continue;
3909 }
3910 pid = walsnd->pid;
3911 sent_ptr = walsnd->sentPtr;
3912 state = walsnd->state;
3913 write = walsnd->write;
3914 flush = walsnd->flush;
3915 apply = walsnd->apply;
3916 writeLag = walsnd->writeLag;
3917 flushLag = walsnd->flushLag;
3918 applyLag = walsnd->applyLag;
3919 priority = walsnd->sync_standby_priority;
3920 replyTime = walsnd->replyTime;
3921 SpinLockRelease(&walsnd->mutex);
3922
3923 /*
3924 * Detect whether walsender is/was considered synchronous. We can
3925 * provide some protection against stale data by checking the PID
3926 * along with walsnd_index.
3927 */
3928 is_sync_standby = false;
3929 for (j = 0; j < num_standbys; j++)
3930 {
3931 if (sync_standbys[j].walsnd_index == i &&
3932 sync_standbys[j].pid == pid)
3933 {
3934 is_sync_standby = true;
3935 break;
3936 }
3937 }
3938
3939 values[0] = Int32GetDatum(pid);
3940
3941 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3942 {
3943 /*
3944 * Only superusers and roles with privileges of pg_read_all_stats
3945 * can see details. Other users only get the pid value to know
3946 * it's a walsender, but no details.
3947 */
3948 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3949 }
3950 else
3951 {
3953
3954 if (XLogRecPtrIsInvalid(sent_ptr))
3955 nulls[2] = true;
3956 values[2] = LSNGetDatum(sent_ptr);
3957
3959 nulls[3] = true;
3960 values[3] = LSNGetDatum(write);
3961
3962 if (XLogRecPtrIsInvalid(flush))
3963 nulls[4] = true;
3964 values[4] = LSNGetDatum(flush);
3965
3966 if (XLogRecPtrIsInvalid(apply))
3967 nulls[5] = true;
3968 values[5] = LSNGetDatum(apply);
3969
3970 /*
3971 * Treat a standby such as a pg_basebackup background process
3972 * which always returns an invalid flush location, as an
3973 * asynchronous standby.
3974 */
3975 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3976
3977 if (writeLag < 0)
3978 nulls[6] = true;
3979 else
3981
3982 if (flushLag < 0)
3983 nulls[7] = true;
3984 else
3986
3987 if (applyLag < 0)
3988 nulls[8] = true;
3989 else
3991
3992 values[9] = Int32GetDatum(priority);
3993
3994 /*
3995 * More easily understood version of standby state. This is purely
3996 * informational.
3997 *
3998 * In quorum-based sync replication, the role of each standby
3999 * listed in synchronous_standby_names can be changing very
4000 * frequently. Any standbys considered as "sync" at one moment can
4001 * be switched to "potential" ones at the next moment. So, it's
4002 * basically useless to report "sync" or "potential" as their sync
4003 * states. We report just "quorum" for them.
4004 */
4005 if (priority == 0)
4006 values[10] = CStringGetTextDatum("async");
4007 else if (is_sync_standby)
4009 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4010 else
4011 values[10] = CStringGetTextDatum("potential");
4012
4013 if (replyTime == 0)
4014 nulls[11] = true;
4015 else
4016 values[11] = TimestampTzGetDatum(replyTime);
4017 }
4018
4019 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4020 values, nulls);
4021 }
4022
4023 return (Datum) 0;
4024}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
#define MemSet(start, val, len)
Definition: c.h:991
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:78
Oid GetUserId(void)
Definition: miscinit.c:520
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
TupleDesc setDesc
Definition: execnodes.h:359
Tuplestorestate * setResult
Definition: execnodes.h:358
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:754
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3852
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3833
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2346 of file walsender.c.

2347{
2348 bool changed = false;
2350
2351 Assert(lsn != InvalidXLogRecPtr);
2352 SpinLockAcquire(&slot->mutex);
2353 if (slot->data.restart_lsn != lsn)
2354 {
2355 changed = true;
2356 slot->data.restart_lsn = lsn;
2357 }
2358 SpinLockRelease(&slot->mutex);
2359
2360 if (changed)
2361 {
2365 }
2366
2367 /*
2368 * One could argue that the slot should be saved to disk now, but that'd
2369 * be energy wasted - the worst thing lost information could cause here is
2370 * to give wrong information in a statistics view - we'll just potentially
2371 * be more conservative in removing files.
2372 */
2373}
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1156
slock_t mutex
Definition: slot.h:158
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1714

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2484 of file walsender.c.

2485{
2486 bool changed = false;
2488
2489 SpinLockAcquire(&slot->mutex);
2491
2492 /*
2493 * For physical replication we don't need the interlock provided by xmin
2494 * and effective_xmin since the consequences of a missed increase are
2495 * limited to query cancellations, so set both at once.
2496 */
2497 if (!TransactionIdIsNormal(slot->data.xmin) ||
2498 !TransactionIdIsNormal(feedbackXmin) ||
2499 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2500 {
2501 changed = true;
2502 slot->data.xmin = feedbackXmin;
2503 slot->effective_xmin = feedbackXmin;
2504 }
2506 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2507 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2508 {
2509 changed = true;
2510 slot->data.catalog_xmin = feedbackCatalogXmin;
2511 slot->effective_catalog_xmin = feedbackCatalogXmin;
2512 }
2513 SpinLockRelease(&slot->mutex);
2514
2515 if (changed)
2516 {
2519 }
2520}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1100
TransactionId xmin
Definition: slot.h:89
TransactionId catalog_xmin
Definition: slot.h:97
TransactionId effective_catalog_xmin
Definition: slot.h:182
TransactionId effective_xmin
Definition: slot.h:181
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1714 of file walsender.c.

1715{
1717
1718 /*
1719 * If we are running in a standby, there is no need to wake up walsenders.
1720 * This is because we do not support syncing slots to cascading standbys,
1721 * so, there are no walsenders waiting for standbys to catch up.
1722 */
1723 if (RecoveryInProgress())
1724 return;
1725
1728}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2772
#define SlotIsPhysical(slot)
Definition: slot.h:220
ConditionVariable wal_confirm_rcv_cv

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1595 of file walsender.c.

1596{
1597 for (;;)
1598 {
1599 long sleeptime;
1600
1601 /* Check for input from the client */
1603
1604 /* die if timeout was reached */
1606
1607 /* Send keepalive if the time has come */
1609
1610 if (!pq_is_send_pending())
1611 break;
1612
1614
1615 /* Sleep until something happens or we time out */
1617 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1618
1619 /* Clear any already-pending wakeups */
1621
1623
1624 /* Process any requests or signals received recently */
1626 {
1627 ConfigReloadPending = false;
1630 }
1631
1632 /* Try to flush pending output to the client */
1633 if (pq_flush_if_writable() != 0)
1635 }
1636
1637 /* reactivate latch so WalSndLoop knows to continue */
1639}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
struct Latch * MyLatch
Definition: globals.c:64
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:445
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_SOCKET_WRITEABLE
Definition: waiteventset.h:36
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3692
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2732
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2202
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4062
static pg_noreturn void WalSndShutdown(void)
Definition: walsender.c:366
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2688

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2202 of file walsender.c.

2203{
2204 unsigned char firstchar;
2205 int maxmsglen;
2206 int r;
2207 bool received = false;
2208
2210
2211 /*
2212 * If we already received a CopyDone from the frontend, any subsequent
2213 * message is the beginning of a new command, and should be processed in
2214 * the main processing loop.
2215 */
2216 while (!streamingDoneReceiving)
2217 {
2219 r = pq_getbyte_if_available(&firstchar);
2220 if (r < 0)
2221 {
2222 /* unexpected error or EOF */
2224 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2225 errmsg("unexpected EOF on standby connection")));
2226 proc_exit(0);
2227 }
2228 if (r == 0)
2229 {
2230 /* no data available without blocking */
2231 pq_endmsgread();
2232 break;
2233 }
2234
2235 /* Validate message type and set packet size limit */
2236 switch (firstchar)
2237 {
2238 case PqMsg_CopyData:
2239 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2240 break;
2241 case PqMsg_CopyDone:
2242 case PqMsg_Terminate:
2243 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2244 break;
2245 default:
2246 ereport(FATAL,
2247 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2248 errmsg("invalid standby message type \"%c\"",
2249 firstchar)));
2250 maxmsglen = 0; /* keep compiler quiet */
2251 break;
2252 }
2253
2254 /* Read the message contents */
2256 if (pq_getmessage(&reply_message, maxmsglen))
2257 {
2259 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2260 errmsg("unexpected EOF on standby connection")));
2261 proc_exit(0);
2262 }
2263
2264 /* ... and process it */
2265 switch (firstchar)
2266 {
2267 /*
2268 * 'd' means a standby reply wrapped in a CopyData packet.
2269 */
2270 case PqMsg_CopyData:
2272 received = true;
2273 break;
2274
2275 /*
2276 * CopyDone means the standby requested to finish streaming.
2277 * Reply with CopyDone, if we had not sent that already.
2278 */
2279 case PqMsg_CopyDone:
2281 {
2282 pq_putmessage_noblock('c', NULL, 0);
2283 streamingDoneSending = true;
2284 }
2285
2287 received = true;
2288 break;
2289
2290 /*
2291 * 'X' means that the standby is closing down the socket.
2292 */
2293 case PqMsg_Terminate:
2294 proc_exit(0);
2295
2296 default:
2297 Assert(false); /* NOT REACHED */
2298 }
2299 }
2300
2301 /*
2302 * Save the last reply timestamp if we've received at least one reply.
2303 */
2304 if (received)
2305 {
2308 }
2309}
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1004
void pq_endmsgread(void)
Definition: pqcomm.c:1166
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static bool waiting_for_ping_response
Definition: walsender.c:187
static TimestampTz last_processing
Definition: walsender.c:178
static bool streamingDoneSending
Definition: walsender.c:195
static void ProcessStandbyMessage(void)
Definition: walsender.c:2315
static bool streamingDoneReceiving
Definition: walsender.c:196

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2564 of file walsender.c.

2565{
2566 TransactionId feedbackXmin;
2567 uint32 feedbackEpoch;
2568 TransactionId feedbackCatalogXmin;
2569 uint32 feedbackCatalogEpoch;
2570 TimestampTz replyTime;
2571
2572 /*
2573 * Decipher the reply message. The caller already consumed the msgtype
2574 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2575 * of this message.
2576 */
2577 replyTime = pq_getmsgint64(&reply_message);
2578 feedbackXmin = pq_getmsgint(&reply_message, 4);
2579 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2580 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2581 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2582
2584 {
2585 char *replyTimeStr;
2586
2587 /* Copy because timestamptz_to_str returns a static buffer */
2588 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2589
2590 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2591 feedbackXmin,
2592 feedbackEpoch,
2593 feedbackCatalogXmin,
2594 feedbackCatalogEpoch,
2595 replyTimeStr);
2596
2597 pfree(replyTimeStr);
2598 }
2599
2600 /*
2601 * Update shared state for this WalSender process based on reply data from
2602 * standby.
2603 */
2604 {
2605 WalSnd *walsnd = MyWalSnd;
2606
2607 SpinLockAcquire(&walsnd->mutex);
2608 walsnd->replyTime = replyTime;
2609 SpinLockRelease(&walsnd->mutex);
2610 }
2611
2612 /*
2613 * Unset WalSender's xmins if the feedback message values are invalid.
2614 * This happens when the downstream turned hot_standby_feedback off.
2615 */
2616 if (!TransactionIdIsNormal(feedbackXmin)
2617 && !TransactionIdIsNormal(feedbackCatalogXmin))
2618 {
2620 if (MyReplicationSlot != NULL)
2621 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2622 return;
2623 }
2624
2625 /*
2626 * Check that the provided xmin/epoch are sane, that is, not in the future
2627 * and not so far back as to be already wrapped around. Ignore if not.
2628 */
2629 if (TransactionIdIsNormal(feedbackXmin) &&
2630 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2631 return;
2632
2633 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2634 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2635 return;
2636
2637 /*
2638 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2639 * the xmin will be taken into account by GetSnapshotData() /
2640 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2641 * thereby prevent the generation of cleanup conflicts on the standby
2642 * server.
2643 *
2644 * There is a small window for a race condition here: although we just
2645 * checked that feedbackXmin precedes nextXid, the nextXid could have
2646 * gotten advanced between our fetching it and applying the xmin below,
2647 * perhaps far enough to make feedbackXmin wrap around. In that case the
2648 * xmin we set here would be "in the future" and have no effect. No point
2649 * in worrying about this since it's too late to save the desired data
2650 * anyway. Assuming that the standby sends us an increasing sequence of
2651 * xmins, this could only happen during the first reply cycle, else our
2652 * own xmin would prevent nextXid from advancing so far.
2653 *
2654 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2655 * is assumed atomic, and there's no real need to prevent concurrent
2656 * horizon determinations. (If we're moving our xmin forward, this is
2657 * obviously safe, and if we're moving it backwards, well, the data is at
2658 * risk already since a VACUUM could already have determined the horizon.)
2659 *
2660 * If we're using a replication slot we reserve the xmin via that,
2661 * otherwise via the walsender's PGPROC entry. We can only track the
2662 * catalog xmin separately when using a slot, so we store the least of the
2663 * two provided when not using a slot.
2664 *
2665 * XXX: It might make sense to generalize the ephemeral slot concept and
2666 * always use the slot mechanism to handle the feedback xmin.
2667 */
2668 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2669 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2670 else
2671 {
2672 if (TransactionIdIsNormal(feedbackCatalogXmin)
2673 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2674 MyProc->xmin = feedbackCatalogXmin;
2675 else
2676 MyProc->xmin = feedbackXmin;
2677 }
2678}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:2321
void pfree(void *pointer)
Definition: mcxt.c:2146
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2484
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2533

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2315 of file walsender.c.

2316{
2317 char msgtype;
2318
2319 /*
2320 * Check message type from the first byte.
2321 */
2322 msgtype = pq_getmsgbyte(&reply_message);
2323
2324 switch (msgtype)
2325 {
2326 case 'r':
2328 break;
2329
2330 case 'h':
2332 break;
2333
2334 default:
2336 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2337 errmsg("unexpected message type \"%c\"", msgtype)));
2338 proc_exit(0);
2339 }
2340}
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2564
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2379

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2379 of file walsender.c.

2380{
2381 XLogRecPtr writePtr,
2382 flushPtr,
2383 applyPtr;
2384 bool replyRequested;
2385 TimeOffset writeLag,
2386 flushLag,
2387 applyLag;
2388 bool clearLagTimes;
2390 TimestampTz replyTime;
2391
2392 static bool fullyAppliedLastTime = false;
2393
2394 /* the caller already consumed the msgtype byte */
2395 writePtr = pq_getmsgint64(&reply_message);
2396 flushPtr = pq_getmsgint64(&reply_message);
2397 applyPtr = pq_getmsgint64(&reply_message);
2398 replyTime = pq_getmsgint64(&reply_message);
2399 replyRequested = pq_getmsgbyte(&reply_message);
2400
2402 {
2403 char *replyTimeStr;
2404
2405 /* Copy because timestamptz_to_str returns a static buffer */
2406 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2407
2408 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2409 LSN_FORMAT_ARGS(writePtr),
2410 LSN_FORMAT_ARGS(flushPtr),
2411 LSN_FORMAT_ARGS(applyPtr),
2412 replyRequested ? " (reply requested)" : "",
2413 replyTimeStr);
2414
2415 pfree(replyTimeStr);
2416 }
2417
2418 /* See if we can compute the round-trip lag for these positions. */
2420 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2421 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2422 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2423
2424 /*
2425 * If the standby reports that it has fully replayed the WAL in two
2426 * consecutive reply messages, then the second such message must result
2427 * from wal_receiver_status_interval expiring on the standby. This is a
2428 * convenient time to forget the lag times measured when it last
2429 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2430 * until more WAL traffic arrives.
2431 */
2432 clearLagTimes = false;
2433 if (applyPtr == sentPtr)
2434 {
2435 if (fullyAppliedLastTime)
2436 clearLagTimes = true;
2437 fullyAppliedLastTime = true;
2438 }
2439 else
2440 fullyAppliedLastTime = false;
2441
2442 /* Send a reply if the standby requested one. */
2443 if (replyRequested)
2445
2446 /*
2447 * Update shared state for this WalSender process based on reply data from
2448 * standby.
2449 */
2450 {
2451 WalSnd *walsnd = MyWalSnd;
2452
2453 SpinLockAcquire(&walsnd->mutex);
2454 walsnd->write = writePtr;
2455 walsnd->flush = flushPtr;
2456 walsnd->apply = applyPtr;
2457 if (writeLag != -1 || clearLagTimes)
2458 walsnd->writeLag = writeLag;
2459 if (flushLag != -1 || clearLagTimes)
2460 walsnd->flushLag = flushLag;
2461 if (applyLag != -1 || clearLagTimes)
2462 walsnd->applyLag = applyLag;
2463 walsnd->replyTime = replyTime;
2464 SpinLockRelease(&walsnd->mutex);
2465 }
2466
2469
2470 /*
2471 * Advance our local xmin horizon when the client confirmed a flush.
2472 */
2473 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2474 {
2477 else
2479 }
2480}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1818
#define SlotIsLogical(slot)
Definition: slot.h:221
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:170
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2346
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4039
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4165

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 464 of file walsender.c.

465{
466#define READ_REPLICATION_SLOT_COLS 3
467 ReplicationSlot *slot;
469 TupOutputState *tstate;
470 TupleDesc tupdesc;
472 bool nulls[READ_REPLICATION_SLOT_COLS];
473
475 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
476 TEXTOID, -1, 0);
477 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
478 TEXTOID, -1, 0);
479 /* TimeLineID is unsigned, so int4 is not wide enough. */
480 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
481 INT8OID, -1, 0);
482
483 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
484
485 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486 slot = SearchNamedReplicationSlot(cmd->slotname, false);
487 if (slot == NULL || !slot->in_use)
488 {
489 LWLockRelease(ReplicationSlotControlLock);
490 }
491 else
492 {
493 ReplicationSlot slot_contents;
494 int i = 0;
495
496 /* Copy slot contents while holding spinlock */
497 SpinLockAcquire(&slot->mutex);
498 slot_contents = *slot;
499 SpinLockRelease(&slot->mutex);
500 LWLockRelease(ReplicationSlotControlLock);
501
502 if (OidIsValid(slot_contents.data.database))
504 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505 errmsg("cannot use %s with a logical replication slot",
506 "READ_REPLICATION_SLOT"));
507
508 /* slot type */
509 values[i] = CStringGetTextDatum("physical");
510 nulls[i] = false;
511 i++;
512
513 /* start LSN */
514 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
515 {
516 char xloc[64];
517
518 snprintf(xloc, sizeof(xloc), "%X/%X",
519 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521 nulls[i] = false;
522 }
523 i++;
524
525 /* timeline this WAL was produced on */
526 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
527 {
528 TimeLineID slots_position_timeline;
529 TimeLineID current_timeline;
530 List *timeline_history = NIL;
531
532 /*
533 * While in recovery, use as timeline the currently-replaying one
534 * to get the LSN position's history.
535 */
536 if (RecoveryInProgress())
537 (void) GetXLogReplayRecPtr(&current_timeline);
538 else
539 current_timeline = GetWALInsertionTimeLine();
540
541 timeline_history = readTimeLineHistory(current_timeline);
542 slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
543 timeline_history);
544 values[i] = Int64GetDatum((int64) slots_position_timeline);
545 nulls[i] = false;
546 }
547 i++;
548
550 }
551
553 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
554 do_tup_output(tstate, values, nulls);
555 end_tup_output(tstate);
556}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
int64_t int64
Definition: c.h:499
#define OidIsValid(objectId)
Definition: c.h:746
@ LW_SHARED
Definition: lwlock.h:115
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:479
Definition: pg_list.h:54
bool in_use
Definition: slot.h:161
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 563 of file walsender.c.

564{
566 TupleDesc tupdesc;
568 char histfname[MAXFNAMELEN];
569 char path[MAXPGPATH];
570 int fd;
571 off_t histfilelen;
572 off_t bytesleft;
573 Size len;
574
576
577 /*
578 * Reply with a result set with one row, and two columns. The first col is
579 * the name of the history file, 2nd is the contents.
580 */
581 tupdesc = CreateTemplateTupleDesc(2);
582 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
583 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
584
585 TLHistoryFileName(histfname, cmd->timeline);
586 TLHistoryFilePath(path, cmd->timeline);
587
588 /* Send a RowDescription message */
589 dest->rStartup(dest, CMD_SELECT, tupdesc);
590
591 /* Send a DataRow message */
593 pq_sendint16(&buf, 2); /* # of columns */
594 len = strlen(histfname);
595 pq_sendint32(&buf, len); /* col1 len */
596 pq_sendbytes(&buf, histfname, len);
597
598 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
599 if (fd < 0)
602 errmsg("could not open file \"%s\": %m", path)));
603
604 /* Determine file length and send it to client */
605 histfilelen = lseek(fd, 0, SEEK_END);
606 if (histfilelen < 0)
609 errmsg("could not seek to end of file \"%s\": %m", path)));
610 if (lseek(fd, 0, SEEK_SET) != 0)
613 errmsg("could not seek to beginning of file \"%s\": %m", path)));
614
615 pq_sendint32(&buf, histfilelen); /* col2 len */
616
617 bytesleft = histfilelen;
618 while (bytesleft > 0)
619 {
620 PGAlignedBlock rbuf;
621 int nread;
622
623 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
624 nread = read(fd, rbuf.data, sizeof(rbuf));
626 if (nread < 0)
629 errmsg("could not read file \"%s\": %m",
630 path)));
631 else if (nread == 0)
634 errmsg("could not read file \"%s\": read %d of %zu",
635 path, nread, (Size) bytesleft)));
636
637 pq_sendbytes(&buf, rbuf.data, nread);
638 bytesleft -= nread;
639 }
640
641 if (CloseTransientFile(fd) != 0)
644 errmsg("could not close file \"%s\": %m", path)));
645
647}
#define PG_BINARY
Definition: c.h:1244
size_t Size
Definition: c.h:576
int errcode_for_file_access(void)
Definition: elog.c:877
int CloseTransientFile(int fd)
Definition: fd.c:2871
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:271
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1090
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1433 of file walsender.c.

1434{
1436 QueryCompletion qc;
1437
1438 /* make sure that our requirements are still fulfilled */
1440
1442
1443 ReplicationSlotAcquire(cmd->slotname, true, true);
1444
1445 /*
1446 * Force a disconnect, so that the decoding code doesn't need to care
1447 * about an eventual switch from running in recovery, to running in a
1448 * normal environment. Client code is expected to handle reconnects.
1449 */
1451 {
1452 ereport(LOG,
1453 (errmsg("terminating walsender process after promotion")));
1454 got_STOPPING = true;
1455 }
1456
1457 /*
1458 * Create our decoding context, making it start at the previously ack'ed
1459 * position.
1460 *
1461 * Do this before sending a CopyBothResponse message, so that any errors
1462 * are reported early.
1463 */
1465 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1467 .segment_open = WalSndSegmentOpen,
1468 .segment_close = wal_segment_close),
1472
1474
1475 /* Send a CopyBothResponse message, and start streaming */
1477 pq_sendbyte(&buf, 0);
1478 pq_sendint16(&buf, 0);
1480 pq_flush();
1481
1482 /* Start reading WAL from the oldest required WAL. */
1485
1486 /*
1487 * Report the location after which we'll send out further commits as the
1488 * current sentPtr.
1489 */
1491
1492 /* Also update the sent position status in shared memory */
1496
1497 replication_active = true;
1498
1500
1501 /* Main loop of walsender */
1503
1506
1507 replication_active = false;
1508 if (got_STOPPING)
1509 proc_exit(0);
1511
1512 /* Get out of COPY mode (CommandComplete). */
1513 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1514 EndCommand(&qc, DestRemote, false);
1515}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:559
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2759
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:213
static void XLogSendLogical(void)
Definition: walsender.c:3381
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:232

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 795 of file walsender.c.

796{
798 XLogRecPtr FlushPtr;
799 TimeLineID FlushTLI;
800
801 /* create xlogreader for physical replication */
802 xlogreader =
804 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
805 .segment_close = wal_segment_close),
806 NULL);
807
808 if (!xlogreader)
810 (errcode(ERRCODE_OUT_OF_MEMORY),
811 errmsg("out of memory"),
812 errdetail("Failed while allocating a WAL reading processor.")));
813
814 /*
815 * We assume here that we're logging enough information in the WAL for
816 * log-shipping, since this is checked in PostmasterMain().
817 *
818 * NOTE: wal_level can only change at shutdown, so in most cases it is
819 * difficult for there to be WAL data that we can still see that was
820 * written at wal_level='minimal'.
821 */
822
823 if (cmd->slotname)
824 {
825 ReplicationSlotAcquire(cmd->slotname, true, true);
828 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
829 errmsg("cannot use a logical replication slot for physical replication")));
830
831 /*
832 * We don't need to verify the slot's restart_lsn here; instead we
833 * rely on the caller requesting the starting point to use. If the
834 * WAL segment doesn't exist, we'll fail later.
835 */
836 }
837
838 /*
839 * Select the timeline. If it was given explicitly by the client, use
840 * that. Otherwise use the timeline of the last replayed record.
841 */
844 FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
845 else
846 FlushPtr = GetFlushRecPtr(&FlushTLI);
847
848 if (cmd->timeline != 0)
849 {
850 XLogRecPtr switchpoint;
851
852 sendTimeLine = cmd->timeline;
853 if (sendTimeLine == FlushTLI)
854 {
857 }
858 else
859 {
860 List *timeLineHistory;
861
863
864 /*
865 * Check that the timeline the client requested exists, and the
866 * requested start location is on that timeline.
867 */
868 timeLineHistory = readTimeLineHistory(FlushTLI);
869 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
871 list_free_deep(timeLineHistory);
872
873 /*
874 * Found the requested timeline in the history. Check that
875 * requested startpoint is on that timeline in our history.
876 *
877 * This is quite loose on purpose. We only check that we didn't
878 * fork off the requested timeline before the switchpoint. We
879 * don't check that we switched *to* it before the requested
880 * starting point. This is because the client can legitimately
881 * request to start replication from the beginning of the WAL
882 * segment that contains switchpoint, but on the new timeline, so
883 * that it doesn't end up with a partial segment. If you ask for
884 * too old a starting point, you'll get an error later when we
885 * fail to find the requested WAL segment in pg_wal.
886 *
887 * XXX: we could be more strict here and only allow a startpoint
888 * that's older than the switchpoint, if it's still in the same
889 * WAL segment.
890 */
891 if (!XLogRecPtrIsInvalid(switchpoint) &&
892 switchpoint < cmd->startpoint)
893 {
895 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
897 cmd->timeline),
898 errdetail("This server's history forked from timeline %u at %X/%X.",
899 cmd->timeline,
900 LSN_FORMAT_ARGS(switchpoint))));
901 }
902 sendTimeLineValidUpto = switchpoint;
903 }
904 }
905 else
906 {
907 sendTimeLine = FlushTLI;
910 }
911
913
914 /* If there is nothing to stream, don't even enter COPY mode */
916 {
917 /*
918 * When we first start replication the standby will be behind the
919 * primary. For some applications, for example synchronous
920 * replication, it is important to have a clear state for this initial
921 * catchup mode, so we can trigger actions when we change streaming
922 * state later. We may stay in this state for a long time, which is
923 * exactly why we want to be able to monitor whether or not we are
924 * still here.
925 */
927
928 /* Send a CopyBothResponse message, and start streaming */
930 pq_sendbyte(&buf, 0);
931 pq_sendint16(&buf, 0);
933 pq_flush();
934
935 /*
936 * Don't allow a request to stream from a future point in WAL that
937 * hasn't been flushed to disk in this server yet.
938 */
939 if (FlushPtr < cmd->startpoint)
940 {
942 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
944 LSN_FORMAT_ARGS(FlushPtr))));
945 }
946
947 /* Start streaming from the requested point */
948 sentPtr = cmd->startpoint;
949
950 /* Initialize shared memory status, too */
954
956
957 /* Main loop of walsender */
958 replication_active = true;
959
961
962 replication_active = false;
963 if (got_STOPPING)
964 proc_exit(0);
966
968 }
969
970 if (cmd->slotname)
972
973 /*
974 * Copy is finished now. Send a single-row result set indicating the next
975 * timeline.
976 */
978 {
979 char startpos_str[8 + 1 + 8 + 1];
981 TupOutputState *tstate;
982 TupleDesc tupdesc;
983 Datum values[2];
984 bool nulls[2] = {0};
985
986 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
988
990
991 /*
992 * Need a tuple descriptor representing two columns. int8 may seem
993 * like a surprising data type for this, but in theory int4 would not
994 * be wide enough for this, as TimeLineID is unsigned.
995 */
996 tupdesc = CreateTemplateTupleDesc(2);
997 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
998 INT8OID, -1, 0);
999 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1000 TEXTOID, -1, 0);
1001
1002 /* prepare for projection of tuple */
1003 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1004
1006 values[1] = CStringGetTextDatum(startpos_str);
1007
1008 /* send it to dest */
1009 do_tup_output(tstate, values, nulls);
1010
1011 end_tup_output(tstate);
1012 }
1013
1014 /* Send CommandComplete message */
1015 EndReplicationCommand("START_STREAMING");
1016}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1204
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3071
int wal_segment_size
Definition: xlog.c:143
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:107

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2533 of file walsender.c.

2534{
2535 FullTransactionId nextFullXid;
2536 TransactionId nextXid;
2537 uint32 nextEpoch;
2538
2539 nextFullXid = ReadNextFullTransactionId();
2540 nextXid = XidFromFullTransactionId(nextFullXid);
2541 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2542
2543 if (xid <= nextXid)
2544 {
2545 if (epoch != nextEpoch)
2546 return false;
2547 }
2548 else
2549 {
2550 if (epoch + 1 != nextEpoch)
2551 return false;
2552 }
2553
2554 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2555 return false; /* epoch OK, but it's wrapped around */
2556
2557 return true;
2558}
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 653 of file walsender.c.

654{
655 MemoryContext mcxt;
657 off_t offset = 0;
659
660 /*
661 * parsing the manifest will use the cryptohash stuff, which requires a
662 * resource owner
663 */
666 CurrentResourceOwner == NULL);
668
669 /* Prepare to read manifest data into a temporary context. */
671 "incremental backup information",
674
675 /* Send a CopyInResponse message */
677 pq_sendbyte(&buf, 0);
678 pq_sendint16(&buf, 0);
680 pq_flush();
681
682 /* Receive packets from client until done. */
683 while (HandleUploadManifestPacket(&buf, &offset, ib))
684 ;
685
686 /* Finish up manifest processing. */
688
689 /*
690 * Discard any old manifest information and arrange to preserve the new
691 * information we just got.
692 *
693 * We assume that MemoryContextDelete and MemoryContextSetParent won't
694 * fail, and thus we shouldn't end up bailing out of here in such a way as
695 * to leave dangling pointers.
696 */
697 if (uploaded_manifest_mcxt != NULL)
702
703 /* clean up the resource owner we created */
705}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:668
MemoryContext CacheMemoryContext
Definition: mcxt.c:168
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
#define PqMsg_CopyInResponse
Definition: protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1019
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition: resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:719
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:153

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2732 of file walsender.c.

2733{
2734 TimestampTz timeout;
2735
2736 /* don't bail out if we're doing something that doesn't require timeouts */
2737 if (last_reply_timestamp <= 0)
2738 return;
2739
2742
2743 if (wal_sender_timeout > 0 && last_processing >= timeout)
2744 {
2745 /*
2746 * Since typically expiration of replication timeout means
2747 * communication problem, we don't send the error message to the
2748 * standby.
2749 */
2751 (errmsg("terminating walsender process due to replication timeout")));
2752
2754 }
2755}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:128

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2688 of file walsender.c.

2689{
2690 long sleeptime = 10000; /* 10 s */
2691
2693 {
2694 TimestampTz wakeup_time;
2695
2696 /*
2697 * At the latest stop sleeping once wal_sender_timeout has been
2698 * reached.
2699 */
2702
2703 /*
2704 * If no ping has been sent yet, wakeup when it's time to do so.
2705 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2706 * the timeout passed without a response.
2707 */
2710 wal_sender_timeout / 2);
2711
2712 /* Compute relative time until wakeup. */
2713 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2714 }
2715
2716 return sleeptime;
2717}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3466 of file walsender.c.

3467{
3468 XLogRecPtr replicatedPtr;
3469
3470 /* ... let's just be real sure we're caught up ... */
3471 send_data();
3472
3473 /*
3474 * To figure out whether all WAL has successfully been replicated, check
3475 * flush location if valid, write otherwise. Tools like pg_receivewal will
3476 * usually (unless in synchronous mode) return an invalid flush location.
3477 */
3478 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3480
3481 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3483 {
3484 QueryCompletion qc;
3485
3486 /* Inform the standby that XLOG streaming is done */
3487 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3488 EndCommand(&qc, DestRemote, false);
3489 pq_flush();
3490
3491 proc_exit(0);
3492 }
3495}
static bool WalSndCaughtUp
Definition: walsender.c:199

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 330 of file walsender.c.

331{
336
337 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
339
340 if (MyReplicationSlot != NULL)
342
344
345 replication_active = false;
346
347 /*
348 * If there is a transaction in progress, it will clean up our
349 * ResourceOwner, but if a replication command set up a resource owner
350 * without a transaction, we've got to clean that up now.
351 */
354
356 proc_exit(0);
357
358 /* Revert back to startup state */
360}
void pgaio_error_cleanup(void)
Definition: aio.c:1062
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1953
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:202
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3833 of file walsender.c.

3834{
3835 switch (state)
3836 {
3838 return "startup";
3839 case WALSNDSTATE_BACKUP:
3840 return "backup";
3842 return "catchup";
3844 return "streaming";
3846 return "stopping";
3847 }
3848 return "UNKNOWN";
3849}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3750 of file walsender.c.

3751{
3752 int i;
3753
3754 for (i = 0; i < max_wal_senders; i++)
3755 {
3756 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3757 pid_t pid;
3758
3759 SpinLockAcquire(&walsnd->mutex);
3760 pid = walsnd->pid;
3761 SpinLockRelease(&walsnd->mutex);
3762
3763 if (pid == 0)
3764 continue;
3765
3767 }
3768}
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:283
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4039 of file walsender.c.

4040{
4041 elog(DEBUG2, "sending replication keepalive");
4042
4043 /* construct the message... */
4046 pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
4048 pq_sendbyte(&output_message, requestReply ? 1 : 0);
4049
4050 /* ... and send it wrapped in CopyData */
4052
4053 /* Set local flag */
4054 if (requestReply)
4056}
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4062 of file walsender.c.

4063{
4064 TimestampTz ping_time;
4065
4066 /*
4067 * Don't send keepalive messages if timeouts are globally disabled or
4068 * we're doing something not partaking in timeouts.
4069 */
4071 return;
4072
4074 return;
4075
4076 /*
4077 * If half of wal_sender_timeout has lapsed without receiving any reply
4078 * from the standby, send a keep-alive message to the standby requesting
4079 * an immediate reply.
4080 */
4082 wal_sender_timeout / 2);
4083 if (last_processing >= ping_time)
4084 {
4086
4087 /* Try to flush pending output to the client */
4088 if (pq_flush_if_writable() != 0)
4090 }
4091}

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2977 of file walsender.c.

2978{
2979 WalSnd *walsnd = MyWalSnd;
2980
2981 Assert(walsnd != NULL);
2982
2983 MyWalSnd = NULL;
2984
2985 SpinLockAcquire(&walsnd->mutex);
2986 /* Mark WalSnd struct as no longer being in use. */
2987 walsnd->pid = 0;
2988 SpinLockRelease(&walsnd->mutex);
2989}

References Assert(), WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3587 of file walsender.c.

3588{
3589 got_SIGUSR2 = true;
3591}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2759 of file walsender.c.

2760{
2761 TimestampTz last_flush = 0;
2762
2763 /*
2764 * Initialize the last reply timestamp. That enables timeout processing
2765 * from hereon.
2766 */
2769
2770 /*
2771 * Loop until we reach the end of this timeline or the client requests to
2772 * stop streaming.
2773 */
2774 for (;;)
2775 {
2776 /* Clear any already-pending wakeups */
2778
2780
2781 /* Process any requests or signals received recently */
2783 {
2784 ConfigReloadPending = false;
2787 }
2788
2789 /* Check for input from the client */
2791
2792 /*
2793 * If we have received CopyDone from the client, sent CopyDone
2794 * ourselves, and the output buffer is empty, it's time to exit
2795 * streaming.
2796 */
2799 break;
2800
2801 /*
2802 * If we don't have any pending data in the output buffer, try to send
2803 * some more. If there is some, we don't bother to call send_data
2804 * again until we've flushed it ... but we'd better assume we are not
2805 * caught up.
2806 */
2807 if (!pq_is_send_pending())
2808 send_data();
2809 else
2810 WalSndCaughtUp = false;
2811
2812 /* Try to flush pending output to the client */
2813 if (pq_flush_if_writable() != 0)
2815
2816 /* If nothing remains to be sent right now ... */
2818 {
2819 /*
2820 * If we're in catchup state, move to streaming. This is an
2821 * important state change for users to know about, since before
2822 * this point data loss might occur if the primary dies and we
2823 * need to failover to the standby. The state change is also
2824 * important for synchronous replication, since commits that
2825 * started to wait at that point might wait for some time.
2826 */
2828 {
2830 (errmsg_internal("\"%s\" has now caught up with upstream server",
2833 }
2834
2835 /*
2836 * When SIGUSR2 arrives, we send any outstanding logs up to the
2837 * shutdown checkpoint record (i.e., the latest record), wait for
2838 * them to be replicated to the standby, and exit. This may be a
2839 * normal termination at shutdown, or a promotion, the walsender
2840 * is not sure which.
2841 */
2842 if (got_SIGUSR2)
2843 WalSndDone(send_data);
2844 }
2845
2846 /* Check for replication timeout. */
2848
2849 /* Send keepalive if the time has come */
2851
2852 /*
2853 * Block if we have unsent data. XXX For logical replication, let
2854 * WalSndWaitForWal() handle any other blocking; idle receivers need
2855 * its additional actions. For physical replication, also block if
2856 * caught up; its send_data does not block.
2857 *
2858 * The IO statistics are reported in WalSndWaitForWal() for the
2859 * logical WAL senders.
2860 */
2861 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2864 {
2865 long sleeptime;
2866 int wakeEvents;
2868
2870 wakeEvents = WL_SOCKET_READABLE;
2871 else
2872 wakeEvents = 0;
2873
2874 /*
2875 * Use fresh timestamp, not last_processing, to reduce the chance
2876 * of reaching wal_sender_timeout before sending a keepalive.
2877 */
2879 sleeptime = WalSndComputeSleeptime(now);
2880
2881 if (pq_is_send_pending())
2882 wakeEvents |= WL_SOCKET_WRITEABLE;
2883
2884 /* Report IO statistics, if needed */
2885 if (TimestampDifferenceExceeds(last_flush, now,
2887 {
2888 pgstat_flush_io(false);
2890 last_flush = now;
2891 }
2892
2893 /* Sleep until something happens or we time out */
2894 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2895 }
2896 }
2897}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
char * application_name
Definition: guc_tables.c:560
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition: pgstat_io.c:183
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition: walsender.c:100
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3466

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1526 of file walsender.c.

1527{
1528 /* can't have sync rep confused by sending the same LSN several times */
1529 if (!last_write)
1530 lsn = InvalidXLogRecPtr;
1531
1532 resetStringInfo(ctx->out);
1533
1534 pq_sendbyte(ctx->out, 'w');
1535 pq_sendint64(ctx->out, lsn); /* dataStart */
1536 pq_sendint64(ctx->out, lsn); /* walEnd */
1537
1538 /*
1539 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1540 * reserve space here.
1541 */
1542 pq_sendint64(ctx->out, 0); /* sendtime */
1543}
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3542 of file walsender.c.

3543{
3544 int i;
3545
3546 for (i = 0; i < max_wal_senders; i++)
3547 {
3548 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3549
3550 SpinLockAcquire(&walsnd->mutex);
3551 if (walsnd->pid == 0)
3552 {
3553 SpinLockRelease(&walsnd->mutex);
3554 continue;
3555 }
3556 walsnd->needreload = true;
3557 SpinLockRelease(&walsnd->mutex);
3558 }
3559}

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2993 of file walsender.c.

2995{
2996 char path[MAXPGPATH];
2997
2998 /*-------
2999 * When reading from a historic timeline, and there is a timeline switch
3000 * within this segment, read from the WAL segment belonging to the new
3001 * timeline.
3002 *
3003 * For example, imagine that this server is currently on timeline 5, and
3004 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3005 * 0/13002088. In pg_wal, we have these files:
3006 *
3007 * ...
3008 * 000000040000000000000012
3009 * 000000040000000000000013
3010 * 000000050000000000000013
3011 * 000000050000000000000014
3012 * ...
3013 *
3014 * In this situation, when requested to send the WAL from segment 0x13, on
3015 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3016 * recovery prefers files from newer timelines, so if the segment was
3017 * restored from the archive on this server, the file belonging to the old
3018 * timeline, 000000040000000000000013, might not exist. Their contents are
3019 * equal up to the switchpoint, because at a timeline switch, the used
3020 * portion of the old segment is copied to the new file.
3021 */
3022 *tli_p = sendTimeLine;
3024 {
3025 XLogSegNo endSegNo;
3026
3027 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3028 if (nextSegNo == endSegNo)
3029 *tli_p = sendTimeLineNextTLI;
3030 }
3031
3032 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3033 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3034 if (state->seg.ws_file >= 0)
3035 return;
3036
3037 /*
3038 * If the file is not found, assume it's because the standby asked for a
3039 * too old WAL segment that has already been removed or recycled.
3040 */
3041 if (errno == ENOENT)
3042 {
3043 char xlogfname[MAXFNAMELEN];
3044 int save_errno = errno;
3045
3046 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3047 errno = save_errno;
3048 ereport(ERROR,
3050 errmsg("requested WAL segment %s has already been removed",
3051 xlogfname)));
3052 }
3053 else
3054 ereport(ERROR,
3056 errmsg("could not open file \"%s\": %m",
3057 path)));
3058}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1089
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3814 of file walsender.c.

3815{
3816 WalSnd *walsnd = MyWalSnd;
3817
3819
3820 if (walsnd->state == state)
3821 return;
3822
3823 SpinLockAcquire(&walsnd->mutex);
3824 walsnd->state = state;
3825 SpinLockRelease(&walsnd->mutex);
3826}

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3626 of file walsender.c.

3627{
3628 bool found;
3629 int i;
3630
3632 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3633
3634 if (!found)
3635 {
3636 /* First time through, so initialize */
3638
3639 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3641
3642 for (i = 0; i < max_wal_senders; i++)
3643 {
3644 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3645
3646 SpinLockInit(&walsnd->mutex);
3647 }
3648
3652 }
3653}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3614

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3614 of file walsender.c.

3615{
3616 Size size = 0;
3617
3618 size = offsetof(WalSndCtlData, walsnds);
3619 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3620
3621 return size;
3622}
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 366 of file walsender.c.

367{
368 /*
369 * Reset whereToSendOutput to prevent ereport from attempting to send any
370 * more messages to the standby.
371 */
374
375 proc_exit(0);
376 abort(); /* keep the compiler quiet */
377}
@ DestNone
Definition: dest.h:87
CommandDest whereToSendOutput
Definition: postgres.c:91

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3595 of file walsender.c.

3596{
3597 /* Set up signal handlers */
3599 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3600 pqsignal(SIGTERM, die); /* request shutdown */
3601 /* SIGQUIT handler was already set up by InitPostmasterChild */
3602 InitializeTimeouts(); /* establishes SIGALRM handler */
3603 pqsignal(SIGPIPE, SIG_IGN);
3605 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3606 * shutdown */
3607
3608 /* Reset some signals that are accepted by postmaster but not here */
3609 pqsignal(SIGCHLD, SIG_DFL);
3610}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
#define die(msg)
#define pqsignal
Definition: port.h:531
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3058
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3587
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1649 of file walsender.c.

1651{
1652 static TimestampTz sendTime = 0;
1654 bool pending_writes = false;
1655 bool end_xact = ctx->end_xact;
1656
1657 /*
1658 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1659 * avoid flooding the lag tracker when we commit frequently.
1660 *
1661 * We don't have a mechanism to get the ack for any LSN other than end
1662 * xact LSN from the downstream. So, we track lag only for end of
1663 * transaction LSN.
1664 */
1665#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1666 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1668 {
1669 LagTrackerWrite(lsn, now);
1670 sendTime = now;
1671 }
1672
1673 /*
1674 * When skipping empty transactions in synchronous replication, we send a
1675 * keepalive message to avoid delaying such transactions.
1676 *
1677 * It is okay to check sync_standbys_status without lock here as in the
1678 * worst case we will just send an extra keepalive message when it is
1679 * really not required.
1680 */
1681 if (skipped_xact &&
1682 SyncRepRequested() &&
1683 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1684 {
1685 WalSndKeepalive(false, lsn);
1686
1687 /* Try to flush pending output to the client */
1688 if (pq_flush_if_writable() != 0)
1690
1691 /* If we have pending write here, make sure it's actually flushed */
1692 if (pq_is_send_pending())
1693 pending_writes = true;
1694 }
1695
1696 /*
1697 * Process pending writes if any or try to send a keepalive if required.
1698 * We don't need to try sending keep alive messages at the transaction end
1699 * as that will be done at a later point in time. This is required only
1700 * for large transactions where we don't send any changes to the
1701 * downstream and the receiver can timeout due to that.
1702 */
1703 if (pending_writes || (!end_xact &&
1705 wal_sender_timeout / 2)))
1707}
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1595
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4100
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3692 of file walsender.c.

3693{
3694 WaitEvent event;
3695
3696 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3697
3698 /*
3699 * We use a condition variable to efficiently wake up walsenders in
3700 * WalSndWakeup().
3701 *
3702 * Every walsender prepares to sleep on a shared memory CV. Note that it
3703 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3704 * waitlist), but does not actually wait on the CV (IOW, it never calls
3705 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3706 * waiting, because we also need to wait for socket events. The processes
3707 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3708 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3709 * walsenders come out of WaitEventSetWait().
3710 *
3711 * This approach is simple and efficient because, one doesn't have to loop
3712 * through all the walsenders slots, with a spinlock acquisition and
3713 * release for every iteration, just to wake up only the waiting
3714 * walsenders. It makes WalSndWakeup() callers' life easy.
3715 *
3716 * XXX: A desirable future improvement would be to add support for CVs
3717 * into WaitEventSetWait().
3718 *
3719 * And, we use separate shared memory CVs for physical and logical
3720 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3721 *
3722 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3723 * until awakened by physical walsenders after the walreceiver confirms
3724 * the receipt of the LSN.
3725 */
3726 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3732
3733 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3734 (event.events & WL_POSTMASTER_DEATH))
3735 {
3737 proc_exit(1);
3738 }
3739
3741}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: waiteventset.h:62
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: waiteventset.c:655
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1799 of file walsender.c.

1800{
1801 int wakeEvents;
1802 uint32 wait_event = 0;
1803 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1804 TimestampTz last_flush = 0;
1805
1806 /*
1807 * Fast path to avoid acquiring the spinlock in case we already know we
1808 * have enough WAL available and all the standby servers have confirmed
1809 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1810 * if we're far behind.
1811 */
1812 if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
1813 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1814 return RecentFlushPtr;
1815
1816 /*
1817 * Within the loop, we wait for the necessary WALs to be flushed to disk
1818 * first, followed by waiting for standbys to catch up if there are enough
1819 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1820 */
1821 for (;;)
1822 {
1823 bool wait_for_standby_at_stop = false;
1824 long sleeptime;
1826
1827 /* Clear any already-pending wakeups */
1829
1831
1832 /* Process any requests or signals received recently */
1834 {
1835 ConfigReloadPending = false;
1838 }
1839
1840 /* Check for input from the client */
1842
1843 /*
1844 * If we're shutting down, trigger pending WAL to be written out,
1845 * otherwise we'd possibly end up waiting for WAL that never gets
1846 * written, because walwriter has shut down already.
1847 */
1848 if (got_STOPPING)
1850
1851 /*
1852 * To avoid the scenario where standbys need to catch up to a newer
1853 * WAL location in each iteration, we update our idea of the currently
1854 * flushed position only if we are not waiting for standbys to catch
1855 * up.
1856 */
1857 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1858 {
1859 if (!RecoveryInProgress())
1860 RecentFlushPtr = GetFlushRecPtr(NULL);
1861 else
1862 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1863 }
1864
1865 /*
1866 * If postmaster asked us to stop and the standby slots have caught up
1867 * to the flushed position, don't wait anymore.
1868 *
1869 * It's important to do this check after the recomputation of
1870 * RecentFlushPtr, so we can send all remaining data before shutting
1871 * down.
1872 */
1873 if (got_STOPPING)
1874 {
1875 if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1876 wait_for_standby_at_stop = true;
1877 else
1878 break;
1879 }
1880
1881 /*
1882 * We only send regular messages to the client for full decoded
1883 * transactions, but a synchronous replication and walsender shutdown
1884 * possibly are waiting for a later location. So, before sleeping, we
1885 * send a ping containing the flush location. If the receiver is
1886 * otherwise idle, this keepalive will trigger a reply. Processing the
1887 * reply will update these MyWalSnd locations.
1888 */
1889 if (MyWalSnd->flush < sentPtr &&
1890 MyWalSnd->write < sentPtr &&
1893
1894 /*
1895 * Exit the loop if already caught up and doesn't need to wait for
1896 * standby slots.
1897 */
1898 if (!wait_for_standby_at_stop &&
1899 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1900 break;
1901
1902 /*
1903 * Waiting for new WAL or waiting for standbys to catch up. Since we
1904 * need to wait, we're now caught up.
1905 */
1906 WalSndCaughtUp = true;
1907
1908 /*
1909 * Try to flush any pending output to the client.
1910 */
1911 if (pq_flush_if_writable() != 0)
1913
1914 /*
1915 * If we have received CopyDone from the client, sent CopyDone
1916 * ourselves, and the output buffer is empty, it's time to exit
1917 * streaming, so fail the current WAL fetch request.
1918 */
1921 break;
1922
1923 /* die if timeout was reached */
1925
1926 /* Send keepalive if the time has come */
1928
1929 /*
1930 * Sleep until something happens or we time out. Also wait for the
1931 * socket becoming writable, if there's still pending output.
1932 * Otherwise we might sit on sendable output data while waiting for
1933 * new WAL to be generated. (But if we have nothing to send, we don't
1934 * want to wake on socket-writable.)
1935 */
1937 sleeptime = WalSndComputeSleeptime(now);
1938
1939 wakeEvents = WL_SOCKET_READABLE;
1940
1941 if (pq_is_send_pending())
1942 wakeEvents |= WL_SOCKET_WRITEABLE;
1943
1944 Assert(wait_event != 0);
1945
1946 /* Report IO statistics, if needed */
1947 if (TimestampDifferenceExceeds(last_flush, now,
1949 {
1950 pgstat_flush_io(false);
1952 last_flush = now;
1953 }
1954
1955 WalSndWait(wakeEvents, sleeptime, wait_event);
1956 }
1957
1958 /* reactivate latch so WalSndLoop knows to continue */
1960 return RecentFlushPtr;
1961}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1771
bool XLogBackgroundFlush(void)
Definition: xlog.c:3111

References Assert(), CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsInvalid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3776 of file walsender.c.

3777{
3778 for (;;)
3779 {
3780 int i;
3781 bool all_stopped = true;
3782
3783 for (i = 0; i < max_wal_senders; i++)
3784 {
3785 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3786
3787 SpinLockAcquire(&walsnd->mutex);
3788
3789 if (walsnd->pid == 0)
3790 {
3791 SpinLockRelease(&walsnd->mutex);
3792 continue;
3793 }
3794
3795 if (walsnd->state != WALSNDSTATE_STOPPING)
3796 {
3797 all_stopped = false;
3798 SpinLockRelease(&walsnd->mutex);
3799 break;
3800 }
3801 SpinLockRelease(&walsnd->mutex);
3802 }
3803
3804 /* safe to leave if confirmation is done for all WAL senders */
3805 if (all_stopped)
3806 return;
3807
3808 pg_usleep(10000L); /* wait for 10 msec */
3809 }
3810}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3671 of file walsender.c.

3672{
3673 /*
3674 * Wake up all the walsenders waiting on WAL being flushed or replayed
3675 * respectively. Note that waiting walsender would have prepared to sleep
3676 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3677 * before actually waiting.
3678 */
3679 if (physical)
3681
3682 if (logical)
3684}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1553 of file walsender.c.

1555{
1557
1558 /*
1559 * Fill the send timestamp last, so that it is taken as late as possible.
1560 * This is somewhat ugly, but the protocol is set as it's already used for
1561 * several releases by streaming physical replication.
1562 */
1566 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1567 tmpbuf.data, sizeof(int64));
1568
1569 /* output previously gathered data in a CopyData packet */
1570 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1571
1573
1574 /* Try to flush pending output to the client */
1575 if (pq_flush_if_writable() != 0)
1577
1578 /* Try taking fast path unless we get too close to walsender timeout. */
1580 wal_sender_timeout / 2) &&
1582 {
1583 return;
1584 }
1585
1586 /* If we have pending write here, go to slow path */
1588}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3381 of file walsender.c.

3382{
3383 XLogRecord *record;
3384 char *errm;
3385
3386 /*
3387 * We'll use the current flush point to determine whether we've caught up.
3388 * This variable is static in order to cache it across calls. Caching is
3389 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3390 * spinlock.
3391 */
3392 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3393
3394 /*
3395 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3396 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3397 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3398 * didn't wait - i.e. when we're shutting down.
3399 */
3400 WalSndCaughtUp = false;
3401
3402 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3403
3404 /* xlog record was invalid */
3405 if (errm != NULL)
3406 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3407 errm);
3408
3409 if (record != NULL)
3410 {
3411 /*
3412 * Note the lack of any call to LagTrackerWrite() which is handled by
3413 * WalSndUpdateProgress which is called by output plugin through
3414 * logical decoding write api.
3415 */
3417
3419 }
3420
3421 /*
3422 * If first time through in this session, initialize flushPtr. Otherwise,
3423 * we only need to update flushPtr if EndRecPtr is past it.
3424 */
3425 if (flushPtr == InvalidXLogRecPtr ||
3427 {
3429 flushPtr = GetStandbyFlushRecPtr(NULL);
3430 else
3431 flushPtr = GetFlushRecPtr(NULL);
3432 }
3433
3434 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3435 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3436 WalSndCaughtUp = true;
3437
3438 /*
3439 * If we're caught up and have been requested to stop, have WalSndLoop()
3440 * terminate the connection in an orderly manner, after writing out all
3441 * the pending data.
3442 */
3444 got_SIGUSR2 = true;
3445
3446 /* Update shared memory status */
3447 {
3448 WalSnd *walsnd = MyWalSnd;
3449
3450 SpinLockAcquire(&walsnd->mutex);
3451 walsnd->sentPtr = sentPtr;
3452 SpinLockRelease(&walsnd->mutex);
3453 }
3454}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3071 of file walsender.c.

3072{
3073 XLogRecPtr SendRqstPtr;
3074 XLogRecPtr startptr;
3075 XLogRecPtr endptr;
3076 Size nbytes;
3077 XLogSegNo segno;
3078 WALReadError errinfo;
3079 Size rbytes;
3080
3081 /* If requested switch the WAL sender to the stopping state. */
3082 if (got_STOPPING)
3084
3086 {
3087 WalSndCaughtUp = true;
3088 return;
3089 }
3090
3091 /* Figure out how far we can safely send the WAL. */
3093 {
3094 /*
3095 * Streaming an old timeline that's in this server's history, but is
3096 * not the one we're currently inserting or replaying. It can be
3097 * streamed up to the point where we switched off that timeline.
3098 */
3099 SendRqstPtr = sendTimeLineValidUpto;
3100 }
3101 else if (am_cascading_walsender)
3102 {
3103 TimeLineID SendRqstTLI;
3104
3105 /*
3106 * Streaming the latest timeline on a standby.
3107 *
3108 * Attempt to send all WAL that has already been replayed, so that we
3109 * know it's valid. If we're receiving WAL through streaming
3110 * replication, it's also OK to send any WAL that has been received
3111 * but not replayed.
3112 *
3113 * The timeline we're recovering from can change, or we can be
3114 * promoted. In either case, the current timeline becomes historic. We
3115 * need to detect that so that we don't try to stream past the point
3116 * where we switched to another timeline. We check for promotion or
3117 * timeline switch after calculating FlushPtr, to avoid a race
3118 * condition: if the timeline becomes historic just after we checked
3119 * that it was still current, it's still be OK to stream it up to the
3120 * FlushPtr that was calculated before it became historic.
3121 */
3122 bool becameHistoric = false;
3123
3124 SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3125
3126 if (!RecoveryInProgress())
3127 {
3128 /* We have been promoted. */
3129 SendRqstTLI = GetWALInsertionTimeLine();
3130 am_cascading_walsender = false;
3131 becameHistoric = true;
3132 }
3133 else
3134 {
3135 /*
3136 * Still a cascading standby. But is the timeline we're sending
3137 * still the one recovery is recovering from?
3138 */
3139 if (sendTimeLine != SendRqstTLI)
3140 becameHistoric = true;
3141 }
3142
3143 if (becameHistoric)
3144 {
3145 /*
3146 * The timeline we were sending has become historic. Read the
3147 * timeline history file of the new timeline to see where exactly
3148 * we forked off from the timeline we were sending.
3149 */
3150 List *history;
3151
3152 history = readTimeLineHistory(SendRqstTLI);
3154
3156 list_free_deep(history);
3157
3159
3160 SendRqstPtr = sendTimeLineValidUpto;
3161 }
3162 }
3163 else
3164 {
3165 /*
3166 * Streaming the current timeline on a primary.
3167 *
3168 * Attempt to send all data that's already been written out and
3169 * fsync'd to disk. We cannot go further than what's been written out
3170 * given the current implementation of WALRead(). And in any case
3171 * it's unsafe to send WAL that is not securely down to disk on the
3172 * primary: if the primary subsequently crashes and restarts, standbys
3173 * must not have applied any WAL that got lost on the primary.
3174 */
3175 SendRqstPtr = GetFlushRecPtr(NULL);
3176 }
3177
3178 /*
3179 * Record the current system time as an approximation of the time at which
3180 * this WAL location was written for the purposes of lag tracking.
3181 *
3182 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3183 * is flushed and we could get that time as well as the LSN when we call
3184 * GetFlushRecPtr() above (and likewise for the cascading standby
3185 * equivalent), but rather than putting any new code into the hot WAL path
3186 * it seems good enough to capture the time here. We should reach this
3187 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3188 * may take some time, we read the WAL flush pointer and take the time
3189 * very close to together here so that we'll get a later position if it is
3190 * still moving.
3191 *
3192 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3193 * this gives us a cheap approximation for the WAL flush time for this
3194 * LSN.
3195 *
3196 * Note that the LSN is not necessarily the LSN for the data contained in
3197 * the present message; it's the end of the WAL, which might be further
3198 * ahead. All the lag tracking machinery cares about is finding out when
3199 * that arbitrary LSN is eventually reported as written, flushed and
3200 * applied, so that it can measure the elapsed time.
3201 */
3202 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3203
3204 /*
3205 * If this is a historic timeline and we've reached the point where we
3206 * forked to the next timeline, stop streaming.
3207 *
3208 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3209 * startup process will normally replay all WAL that has been received
3210 * from the primary, before promoting, but if the WAL streaming is
3211 * terminated at a WAL page boundary, the valid portion of the timeline
3212 * might end in the middle of a WAL record. We might've already sent the
3213 * first half of that partial WAL record to the cascading standby, so that
3214 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3215 * replay the partial WAL record either, so it can still follow our
3216 * timeline switch.
3217 */
3219 {
3220 /* close the current file. */
3221 if (xlogreader->seg.ws_file >= 0)
3223
3224 /* Send CopyDone */
3225 pq_putmessage_noblock('c', NULL, 0);
3226 streamingDoneSending = true;
3227
3228 WalSndCaughtUp = true;
3229
3230 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3233 return;
3234 }
3235
3236 /* Do we have any work to do? */
3237 Assert(sentPtr <= SendRqstPtr);
3238 if (SendRqstPtr <= sentPtr)
3239 {
3240 WalSndCaughtUp = true;
3241 return;
3242 }
3243
3244 /*
3245 * Figure out how much to send in one message. If there's no more than
3246 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3247 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3248 *
3249 * The rounding is not only for performance reasons. Walreceiver relies on
3250 * the fact that we never split a WAL record across two messages. Since a
3251 * long WAL record is split at page boundary into continuation records,
3252 * page boundary is always a safe cut-off point. We also assume that
3253 * SendRqstPtr never points to the middle of a WAL record.
3254 */
3255 startptr = sentPtr;
3256 endptr = startptr;
3257 endptr += MAX_SEND_SIZE;
3258
3259 /* if we went beyond SendRqstPtr, back off */
3260 if (SendRqstPtr <= endptr)
3261 {
3262 endptr = SendRqstPtr;
3264 WalSndCaughtUp = false;
3265 else
3266 WalSndCaughtUp = true;
3267 }
3268 else
3269 {
3270 /* round down to page boundary. */
3271 endptr -= (endptr % XLOG_BLCKSZ);
3272 WalSndCaughtUp = false;
3273 }
3274
3275 nbytes = endptr - startptr;
3276 Assert(nbytes <= MAX_SEND_SIZE);
3277
3278 /*
3279 * OK to read and send the slice.
3280 */
3283
3284 pq_sendint64(&output_message, startptr); /* dataStart */
3285 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3286 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3287
3288 /*
3289 * Read the log directly into the output buffer to avoid extra memcpy
3290 * calls.
3291 */
3293
3294retry:
3295 /* attempt to read WAL from WAL buffers first */
3297 startptr, nbytes, xlogreader->seg.ws_tli);
3298 output_message.len += rbytes;
3299 startptr += rbytes;
3300 nbytes -= rbytes;
3301
3302 /* now read the remaining WAL from WAL file */
3303 if (nbytes > 0 &&
3306 startptr,
3307 nbytes,
3308 xlogreader->seg.ws_tli, /* Pass the current TLI because
3309 * only WalSndSegmentOpen controls
3310 * whether new TLI is needed. */
3311 &errinfo))
3312 WALReadRaiseError(&errinfo);
3313
3314 /* See logical_read_xlog_page(). */
3315 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3317
3318 /*
3319 * During recovery, the currently-open WAL file might be replaced with the
3320 * file of the same name retrieved from archive. So we always need to
3321 * check what we read was valid after reading into the buffer. If it's
3322 * invalid, we try to open and read the file again.
3323 */
3325 {
3326 WalSnd *walsnd = MyWalSnd;
3327 bool reload;
3328
3329 SpinLockAcquire(&walsnd->mutex);
3330 reload = walsnd->needreload;
3331 walsnd->needreload = false;
3332 SpinLockRelease(&walsnd->mutex);
3333
3334 if (reload && xlogreader->seg.ws_file >= 0)
3335 {
3337
3338 goto retry;
3339 }
3340 }
3341
3342 output_message.len += nbytes;
3344
3345 /*
3346 * Fill the send timestamp last, so that it is taken as late as possible.
3347 */
3350 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3351 tmpbuf.data, sizeof(int64));
3352
3354
3355 sentPtr = endptr;
3356
3357 /* Update shared memory status */
3358 {
3359 WalSnd *walsnd = MyWalSnd;
3360
3361 SpinLockAcquire(&walsnd->mutex);
3362 walsnd->sentPtr = sentPtr;
3363 SpinLockRelease(&walsnd->mutex);
3364 }
3365
3366 /* Report progress of XLOG streaming in PS display */
3368 {
3369 char activitymsg[50];
3370
3371 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3373 set_ps_display(activitymsg);
3374 }
3375}
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:111
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1759

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 123 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 235 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 213 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 173 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 196 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 152 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 153 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

bool waiting_for_ping_response = false
static

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 135 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader