PostgreSQL Source Code git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 223 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 111 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 100 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 241 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1391 of file walsender.c.

1392{
1393 bool failover_given = false;
1394 bool two_phase_given = false;
1395 bool failover;
1396 bool two_phase;
1397
1398 /* Parse options */
1399 foreach_ptr(DefElem, defel, cmd->options)
1400 {
1401 if (strcmp(defel->defname, "failover") == 0)
1402 {
1403 if (failover_given)
1404 ereport(ERROR,
1405 (errcode(ERRCODE_SYNTAX_ERROR),
1406 errmsg("conflicting or redundant options")));
1407 failover_given = true;
1408 failover = defGetBoolean(defel);
1409 }
1410 else if (strcmp(defel->defname, "two_phase") == 0)
1411 {
1412 if (two_phase_given)
1413 ereport(ERROR,
1414 (errcode(ERRCODE_SYNTAX_ERROR),
1415 errmsg("conflicting or redundant options")));
1416 two_phase_given = true;
1417 two_phase = defGetBoolean(defel);
1418 }
1419 else
1420 elog(ERROR, "unrecognized option: %s", defel->defname);
1421 }
1422
1424 failover_given ? &failover : NULL,
1425 two_phase_given ? &two_phase : NULL);
1426}
bool defGetBoolean(DefElem *def)
Definition: define.c:94
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:837

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1177 of file walsender.c.

1178{
1179 const char *snapshot_name = NULL;
1180 char xloc[MAXFNAMELEN];
1181 char *slot_name;
1182 bool reserve_wal = false;
1183 bool two_phase = false;
1184 bool failover = false;
1185 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1187 TupOutputState *tstate;
1188 TupleDesc tupdesc;
1189 Datum values[4];
1190 bool nulls[4] = {0};
1191
1193
1194 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1195 &failover);
1196
1197 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1198 {
1199 ReplicationSlotCreate(cmd->slotname, false,
1201 false, false, false);
1202
1203 if (reserve_wal)
1204 {
1206
1208
1209 /* Write this slot to disk if it's a permanent one. */
1210 if (!cmd->temporary)
1212 }
1213 }
1214 else
1215 {
1217 bool need_full_snapshot = false;
1218
1220
1222
1223 /*
1224 * Initially create persistent slot as ephemeral - that allows us to
1225 * nicely handle errors during initialization because it'll get
1226 * dropped if this transaction fails. We'll make it persistent at the
1227 * end. Temporary slots can be created as temporary from beginning as
1228 * they get dropped on error as well.
1229 */
1232 two_phase, failover, false);
1233
1234 /*
1235 * Do options check early so that we can bail before calling the
1236 * DecodingContextFindStartpoint which can take long time.
1237 */
1238 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1239 {
1240 if (IsTransactionBlock())
1241 ereport(ERROR,
1242 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1243 (errmsg("%s must not be called inside a transaction",
1244 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1245
1246 need_full_snapshot = true;
1247 }
1248 else if (snapshot_action == CRS_USE_SNAPSHOT)
1249 {
1250 if (!IsTransactionBlock())
1251 ereport(ERROR,
1252 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1253 (errmsg("%s must be called inside a transaction",
1254 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1255
1257 ereport(ERROR,
1258 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1259 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1260 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1261 if (!XactReadOnly)
1262 ereport(ERROR,
1263 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1264 (errmsg("%s must be called in a read-only transaction",
1265 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1266
1267 if (FirstSnapshotSet)
1268 ereport(ERROR,
1269 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1270 (errmsg("%s must be called before any query",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1272
1273 if (IsSubTransaction())
1274 ereport(ERROR,
1275 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1276 (errmsg("%s must not be called in a subtransaction",
1277 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1278
1279 need_full_snapshot = true;
1280 }
1281
1282 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1285 .segment_open = WalSndSegmentOpen,
1286 .segment_close = wal_segment_close),
1289
1290 /*
1291 * Signal that we don't need the timeout mechanism. We're just
1292 * creating the replication slot and don't yet accept feedback
1293 * messages or send keepalives. As we possibly need to wait for
1294 * further WAL the walsender would otherwise possibly be killed too
1295 * soon.
1296 */
1298
1299 /* build initial snapshot, might take a while */
1301
1302 /*
1303 * Export or use the snapshot if we've been asked to do so.
1304 *
1305 * NB. We will convert the snapbuild.c kind of snapshot to normal
1306 * snapshot when doing this.
1307 */
1308 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1309 {
1310 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1311 }
1312 else if (snapshot_action == CRS_USE_SNAPSHOT)
1313 {
1314 Snapshot snap;
1315
1318 }
1319
1320 /* don't need the decoding context anymore */
1322
1323 if (!cmd->temporary)
1325 }
1326
1327 snprintf(xloc, sizeof(xloc), "%X/%X",
1329
1331
1332 /*----------
1333 * Need a tuple descriptor representing four columns:
1334 * - first field: the slot name
1335 * - second field: LSN at which we became consistent
1336 * - third field: exported snapshot's name
1337 * - fourth field: output plugin
1338 */
1339 tupdesc = CreateTemplateTupleDesc(4);
1340 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1341 TEXTOID, -1, 0);
1342 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1343 TEXTOID, -1, 0);
1344 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1345 TEXTOID, -1, 0);
1346 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1347 TEXTOID, -1, 0);
1348
1349 /* prepare for projection of tuples */
1350 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1351
1352 /* slot_name */
1353 slot_name = NameStr(MyReplicationSlot->data.name);
1354 values[0] = CStringGetTextDatum(slot_name);
1355
1356 /* consistent wal location */
1357 values[1] = CStringGetTextDatum(xloc);
1358
1359 /* snapshot name, or NULL if none */
1360 if (snapshot_name != NULL)
1361 values[2] = CStringGetTextDatum(snapshot_name);
1362 else
1363 nulls[2] = true;
1364
1365 /* plugin, or NULL if none */
1366 if (cmd->plugin != NULL)
1368 else
1369 nulls[3] = true;
1370
1371 /* send it to dest */
1372 do_tup_output(tstate, values, nulls);
1373 end_tup_output(tstate);
1374
1376}
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:717
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2464
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
Assert(PointerIsAligned(start, uint64))
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:675
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:631
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:109
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:239
uintptr_t Datum
Definition: postgres.h:69
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:324
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1061
void ReplicationSlotReserveWal(void)
Definition: slot.c:1452
void ReplicationSlotPersist(void)
Definition: slot.c:1078
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
void ReplicationSlotSave(void)
Definition: slot.c:1043
void ReplicationSlotRelease(void)
Definition: slot.c:686
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
bool FirstSnapshotSet
Definition: snapmgr.c:192
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1843
PGPROC * MyProc
Definition: proc.c:67
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:111
ReplicationSlotPersistentData data
Definition: slot.h:185
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:175
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:911
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1100
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:3017
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1553
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1649
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1027
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1526
static TimestampTz last_reply_timestamp
Definition: walsender.c:184
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:82
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:5044
bool IsTransactionBlock(void)
Definition: xact.c:4971
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, failover, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1382 of file walsender.c.

1383{
1384 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1385}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:814

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1970 of file walsender.c.

1971{
1972 yyscan_t scanner;
1973 int parse_rc;
1974 Node *cmd_node;
1975 const char *cmdtag;
1976 MemoryContext old_context = CurrentMemoryContext;
1977
1978 /* We save and re-use the cmd_context across calls */
1979 static MemoryContext cmd_context = NULL;
1980
1981 /*
1982 * If WAL sender has been told that shutdown is getting close, switch its
1983 * status accordingly to handle the next replication commands correctly.
1984 */
1985 if (got_STOPPING)
1987
1988 /*
1989 * Throw error if in stopping mode. We need prevent commands that could
1990 * generate WAL while the shutdown checkpoint is being written. To be
1991 * safe, we just prohibit all new commands.
1992 */
1994 ereport(ERROR,
1995 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1996 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1997
1998 /*
1999 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2000 * command arrives. Clean up the old stuff if there's anything.
2001 */
2003
2005
2006 /*
2007 * Prepare to parse and execute the command.
2008 *
2009 * Because replication command execution can involve beginning or ending
2010 * transactions, we need a working context that will survive that, so we
2011 * make it a child of TopMemoryContext. That in turn creates a hazard of
2012 * long-lived memory leaks if we lose track of the working context. We
2013 * deal with that by creating it only once per walsender, and resetting it
2014 * for each new command. (Normally this reset is a no-op, but if the
2015 * prior exec_replication_command call failed with an error, it won't be.)
2016 *
2017 * This is subtler than it looks. The transactions we manage can extend
2018 * across replication commands, indeed SnapBuildClearExportedSnapshot
2019 * might have just ended one. Because transaction exit will revert to the
2020 * memory context that was current at transaction start, we need to be
2021 * sure that that context is still valid. That motivates re-using the
2022 * same cmd_context rather than making a new one each time.
2023 */
2024 if (cmd_context == NULL)
2026 "Replication command context",
2028 else
2029 MemoryContextReset(cmd_context);
2030
2031 MemoryContextSwitchTo(cmd_context);
2032
2033 replication_scanner_init(cmd_string, &scanner);
2034
2035 /*
2036 * Is it a WalSender command?
2037 */
2039 {
2040 /* Nope; clean up and get out. */
2042
2043 MemoryContextSwitchTo(old_context);
2044 MemoryContextReset(cmd_context);
2045
2046 /* XXX this is a pretty random place to make this check */
2047 if (MyDatabaseId == InvalidOid)
2048 ereport(ERROR,
2049 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2050 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2051
2052 /* Tell the caller that this wasn't a WalSender command. */
2053 return false;
2054 }
2055
2056 /*
2057 * Looks like a WalSender command, so parse it.
2058 */
2059 parse_rc = replication_yyparse(&cmd_node, scanner);
2060 if (parse_rc != 0)
2061 ereport(ERROR,
2062 (errcode(ERRCODE_SYNTAX_ERROR),
2063 errmsg_internal("replication command parser returned %d",
2064 parse_rc)));
2066
2067 /*
2068 * Report query to various monitoring facilities. For this purpose, we
2069 * report replication commands just like SQL commands.
2070 */
2071 debug_query_string = cmd_string;
2072
2074
2075 /*
2076 * Log replication command if log_replication_commands is enabled. Even
2077 * when it's disabled, log the command with DEBUG1 level for backward
2078 * compatibility.
2079 */
2081 (errmsg("received replication command: %s", cmd_string)));
2082
2083 /*
2084 * Disallow replication commands in aborted transaction blocks.
2085 */
2087 ereport(ERROR,
2088 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2089 errmsg("current transaction is aborted, "
2090 "commands ignored until end of transaction block")));
2091
2093
2094 /*
2095 * Allocate buffers that will be used for each outgoing and incoming
2096 * message. We do this just once per command to reduce palloc overhead.
2097 */
2101
2102 switch (cmd_node->type)
2103 {
2104 case T_IdentifySystemCmd:
2105 cmdtag = "IDENTIFY_SYSTEM";
2106 set_ps_display(cmdtag);
2108 EndReplicationCommand(cmdtag);
2109 break;
2110
2111 case T_ReadReplicationSlotCmd:
2112 cmdtag = "READ_REPLICATION_SLOT";
2113 set_ps_display(cmdtag);
2115 EndReplicationCommand(cmdtag);
2116 break;
2117
2118 case T_BaseBackupCmd:
2119 cmdtag = "BASE_BACKUP";
2120 set_ps_display(cmdtag);
2121 PreventInTransactionBlock(true, cmdtag);
2123 EndReplicationCommand(cmdtag);
2124 break;
2125
2126 case T_CreateReplicationSlotCmd:
2127 cmdtag = "CREATE_REPLICATION_SLOT";
2128 set_ps_display(cmdtag);
2130 EndReplicationCommand(cmdtag);
2131 break;
2132
2133 case T_DropReplicationSlotCmd:
2134 cmdtag = "DROP_REPLICATION_SLOT";
2135 set_ps_display(cmdtag);
2137 EndReplicationCommand(cmdtag);
2138 break;
2139
2140 case T_AlterReplicationSlotCmd:
2141 cmdtag = "ALTER_REPLICATION_SLOT";
2142 set_ps_display(cmdtag);
2144 EndReplicationCommand(cmdtag);
2145 break;
2146
2147 case T_StartReplicationCmd:
2148 {
2149 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2150
2151 cmdtag = "START_REPLICATION";
2152 set_ps_display(cmdtag);
2153 PreventInTransactionBlock(true, cmdtag);
2154
2155 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2156 StartReplication(cmd);
2157 else
2159
2160 /* dupe, but necessary per libpqrcv_endstreaming */
2161 EndReplicationCommand(cmdtag);
2162
2163 Assert(xlogreader != NULL);
2164 break;
2165 }
2166
2167 case T_TimeLineHistoryCmd:
2168 cmdtag = "TIMELINE_HISTORY";
2169 set_ps_display(cmdtag);
2170 PreventInTransactionBlock(true, cmdtag);
2172 EndReplicationCommand(cmdtag);
2173 break;
2174
2175 case T_VariableShowStmt:
2176 {
2178 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2179
2180 cmdtag = "SHOW";
2181 set_ps_display(cmdtag);
2182
2183 /* syscache access needs a transaction environment */
2185 GetPGVariable(n->name, dest);
2187 EndReplicationCommand(cmdtag);
2188 }
2189 break;
2190
2191 case T_UploadManifestCmd:
2192 cmdtag = "UPLOAD_MANIFEST";
2193 set_ps_display(cmdtag);
2194 PreventInTransactionBlock(true, cmdtag);
2196 EndReplicationCommand(cmdtag);
2197 break;
2198
2199 default:
2200 elog(ERROR, "unrecognized replication command node tag: %u",
2201 cmd_node->type);
2202 }
2203
2204 /*
2205 * Done. Revert to caller's memory context, and clean out the cmd_context
2206 * to recover memory right away.
2207 */
2208 MemoryContextSwitchTo(old_context);
2209 MemoryContextReset(cmd_context);
2210
2211 /*
2212 * We need not update ps display or pg_stat_activity, because PostgresMain
2213 * will reset those to "idle". But we must reset debug_query_string to
2214 * ensure it doesn't become a dangling pointer.
2215 */
2216 debug_query_string = NULL;
2217
2218 return true;
2219}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:67
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:95
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
MemoryContext TopMemoryContext
Definition: mcxt.c:165
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:35
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1391
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:563
WalSnd * MyWalSnd
Definition: walsender.c:117
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:464
static StringInfoData tmpbuf
Definition: walsender.c:175
static void IdentifySystem(void)
Definition: walsender.c:383
static StringInfoData reply_message
Definition: walsender.c:174
void WalSndSetState(WalSndState state)
Definition: walsender.c:3838
static StringInfoData output_message
Definition: walsender.c:173
static void UploadManifest(void)
Definition: walsender.c:653
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:203
bool log_replication_commands
Definition: walsender.c:130
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1177
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1433
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:152
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1382
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:795
static XLogReaderState * xlogreader
Definition: walsender.c:142
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3648
void StartTransactionCommand(void)
Definition: xact.c:3059
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:407
void CommitTransactionCommand(void)
Definition: xact.c:3157

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3533 of file walsender.c.

3534{
3535 XLogRecPtr replayPtr;
3536 TimeLineID replayTLI;
3537 XLogRecPtr receivePtr;
3539 XLogRecPtr result;
3540
3542
3543 /*
3544 * We can safely send what's already been replayed. Also, if walreceiver
3545 * is streaming WAL from the same timeline, we can send anything that it
3546 * has streamed, but hasn't been replayed yet.
3547 */
3548
3549 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3550 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3551
3552 if (tli)
3553 *tli = replayTLI;
3554
3555 result = replayPtr;
3556 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3557 result = receivePtr;
3558
3559 return result;
3560}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1668
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:121
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 719 of file walsender.c.

721{
722 int mtype;
723 int maxmsglen;
724
726
728 mtype = pq_getbyte();
729 if (mtype == EOF)
731 (errcode(ERRCODE_CONNECTION_FAILURE),
732 errmsg("unexpected EOF on client connection with an open transaction")));
733
734 switch (mtype)
735 {
736 case 'd': /* CopyData */
737 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
738 break;
739 case 'c': /* CopyDone */
740 case 'f': /* CopyFail */
741 case 'H': /* Flush */
742 case 'S': /* Sync */
743 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
744 break;
745 default:
747 (errcode(ERRCODE_PROTOCOL_VIOLATION),
748 errmsg("unexpected message type 0x%02X during COPY from stdin",
749 mtype)));
750 maxmsglen = 0; /* keep compiler quiet */
751 break;
752 }
753
754 /* Now collect the message body */
755 if (pq_getmessage(buf, maxmsglen))
757 (errcode(ERRCODE_CONNECTION_FAILURE),
758 errmsg("unexpected EOF on client connection with an open transaction")));
760
761 /* Process the message */
762 switch (mtype)
763 {
764 case 'd': /* CopyData */
765 AppendIncrementalManifestData(ib, buf->data, buf->len);
766 return true;
767
768 case 'c': /* CopyDone */
769 return false;
770
771 case 'H': /* Sync */
772 case 'S': /* Flush */
773 /* Ignore these while in CopyOut mode as we do elsewhere. */
774 return true;
775
776 case 'f':
778 (errcode(ERRCODE_QUERY_CANCELED),
779 errmsg("COPY from stdin failed: %s",
781 }
782
783 /* Not reached. */
784 Assert(false);
785 return false;
786}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:144
static char * buf
Definition: pg_test_fsync.c:72
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1204
int pq_getbyte(void)
Definition: pqcomm.c:964
void pq_startmsgread(void)
Definition: pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References AppendIncrementalManifestData(), Assert(), buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3589 of file walsender.c.

3590{
3592
3593 /*
3594 * If replication has not yet started, die like with SIGTERM. If
3595 * replication is active, only set a flag and wake up the main loop. It
3596 * will send any outstanding WAL, wait for it to be replicated to the
3597 * standby, and then exit gracefully.
3598 */
3599 if (!replication_active)
3600 kill(MyProcPid, SIGTERM);
3601 else
3602 got_STOPPING = true;
3603}
int MyProcPid
Definition: globals.c:48
bool am_walsender
Definition: walsender.c:120
static volatile sig_atomic_t replication_active
Definition: walsender.c:211
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 383 of file walsender.c.

384{
385 char sysid[32];
386 char xloc[MAXFNAMELEN];
387 XLogRecPtr logptr;
388 char *dbname = NULL;
390 TupOutputState *tstate;
391 TupleDesc tupdesc;
392 Datum values[4];
393 bool nulls[4] = {0};
394 TimeLineID currTLI;
395
396 /*
397 * Reply with a result set with one row, four columns. First col is system
398 * ID, second is timeline ID, third is current xlog location and the
399 * fourth contains the database name if we are connected to one.
400 */
401
402 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
404
407 logptr = GetStandbyFlushRecPtr(&currTLI);
408 else
409 logptr = GetFlushRecPtr(&currTLI);
410
411 snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
412
414 {
416
417 /* syscache access needs a transaction env. */
420 /* copy dbname out of TX context */
423 }
424
426
427 /* need a tuple descriptor representing four columns */
428 tupdesc = CreateTemplateTupleDesc(4);
429 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
430 TEXTOID, -1, 0);
431 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
432 INT8OID, -1, 0);
433 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
434 TEXTOID, -1, 0);
435 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
436 TEXTOID, -1, 0);
437
438 /* prepare for projection of tuples */
439 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
440
441 /* column 1: system identifier */
442 values[0] = CStringGetTextDatum(sysid);
443
444 /* column 2: timeline */
445 values[1] = Int64GetDatum(currTLI);
446
447 /* column 3: wal location */
448 values[2] = CStringGetTextDatum(xloc);
449
450 /* column 4: database name, or NULL if none */
451 if (dbname)
453 else
454 nulls[3] = true;
455
456 /* send it to dest */
457 do_tup_output(tstate, values, nulls);
458
459 end_tup_output(tstate);
460}
#define UINT64_FORMAT
Definition: c.h:521
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3188
struct cursor * cur
Definition: ecpg.c:29
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:2314
char * dbname
Definition: streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3533
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4734
bool RecoveryInProgress(void)
Definition: xlog.c:6522
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6687

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 283 of file walsender.c.

284{
286
287 /* Create a per-walsender data structure in shared memory */
289
290 /* need resource owner for e.g. basebackups */
292
293 /*
294 * Let postmaster know that we're a WAL sender. Once we've declared us as
295 * a WAL sender process, postmaster will let us outlive the bgwriter and
296 * kill us last in the shutdown sequence, so we get a chance to stream all
297 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298 * there's no going back, and we mustn't write any WAL records after this.
299 */
302
303 /*
304 * If the client didn't specify a database to connect to, show in PGPROC
305 * that our advertised xmin should affect vacuum horizons in all
306 * databases. This allows physical replication clients to send hot
307 * standby feedback that will delay vacuum cleanup in all databases.
308 */
310 {
312 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315 LWLockRelease(ProcArrayLock);
316 }
317
318 /* Initialize empty timestamp buffer for lag tracking. */
320}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1294
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:999
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:186
uint8 statusFlags
Definition: proc.h:251
int pgxactoff
Definition: proc.h:193
uint8 * statusFlags
Definition: proc.h:395
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:2925
static LagTracker * lag_tracker
Definition: walsender.c:235

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2925 of file walsender.c.

2926{
2927 int i;
2928
2929 /*
2930 * WalSndCtl should be set up already (we inherit this by fork() or
2931 * EXEC_BACKEND mechanism from the postmaster).
2932 */
2933 Assert(WalSndCtl != NULL);
2934 Assert(MyWalSnd == NULL);
2935
2936 /*
2937 * Find a free walsender slot and reserve it. This must not fail due to
2938 * the prior check for free WAL senders in InitProcess().
2939 */
2940 for (i = 0; i < max_wal_senders; i++)
2941 {
2942 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2943
2944 SpinLockAcquire(&walsnd->mutex);
2945
2946 if (walsnd->pid != 0)
2947 {
2948 SpinLockRelease(&walsnd->mutex);
2949 continue;
2950 }
2951 else
2952 {
2953 /*
2954 * Found a free slot. Reserve it for us.
2955 */
2956 walsnd->pid = MyProcPid;
2957 walsnd->state = WALSNDSTATE_STARTUP;
2958 walsnd->sentPtr = InvalidXLogRecPtr;
2959 walsnd->needreload = false;
2960 walsnd->write = InvalidXLogRecPtr;
2961 walsnd->flush = InvalidXLogRecPtr;
2962 walsnd->apply = InvalidXLogRecPtr;
2963 walsnd->writeLag = -1;
2964 walsnd->flushLag = -1;
2965 walsnd->applyLag = -1;
2966 walsnd->sync_standby_priority = 0;
2967 walsnd->replyTime = 0;
2968
2969 /*
2970 * The kind assignment is done here and not in StartReplication()
2971 * and StartLogicalReplication(). Indeed, the logical walsender
2972 * needs to read WAL records (like snapshot of running
2973 * transactions) during the slot creation. So it needs to be woken
2974 * up based on its kind.
2975 *
2976 * The kind assignment could also be done in StartReplication(),
2977 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2978 * seems better to set it on one place.
2979 */
2980 if (MyDatabaseId == InvalidOid)
2982 else
2984
2985 SpinLockRelease(&walsnd->mutex);
2986 /* don't need the lock anymore */
2987 MyWalSnd = (WalSnd *) walsnd;
2988
2989 break;
2990 }
2991 }
2992
2993 Assert(MyWalSnd != NULL);
2994
2995 /* Arrange to clean up at walsender exit */
2997}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:126
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:3001
WalSndCtlData * WalSndCtl
Definition: walsender.c:114
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4189 of file walsender.c.

4190{
4191 TimestampTz time = 0;
4192
4193 /* Read all unread samples up to this LSN or end of buffer. */
4194 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4196 {
4198 lag_tracker->last_read[head] =
4200 lag_tracker->read_heads[head] =
4202 }
4203
4204 /*
4205 * If the lag tracker is empty, that means the standby has processed
4206 * everything we've ever sent so we should now clear 'last_read'. If we
4207 * didn't do that, we'd risk using a stale and irrelevant sample for
4208 * interpolation at the beginning of the next burst of WAL after a period
4209 * of idleness.
4210 */
4212 lag_tracker->last_read[head].time = 0;
4213
4214 if (time > now)
4215 {
4216 /* If the clock somehow went backwards, treat as not found. */
4217 return -1;
4218 }
4219 else if (time == 0)
4220 {
4221 /*
4222 * We didn't cross a time. If there is a future sample that we
4223 * haven't reached yet, and we've already reached at least one sample,
4224 * let's interpolate the local flushed time. This is mainly useful
4225 * for reporting a completely stuck apply position as having
4226 * increasing lag, since otherwise we'd have to wait for it to
4227 * eventually start moving again and cross one of our samples before
4228 * we can show the lag increasing.
4229 */
4231 {
4232 /* There are no future samples, so we can't interpolate. */
4233 return -1;
4234 }
4235 else if (lag_tracker->last_read[head].time != 0)
4236 {
4237 /* We can interpolate between last_read and the next sample. */
4238 double fraction;
4239 WalTimeSample prev = lag_tracker->last_read[head];
4241
4242 if (lsn < prev.lsn)
4243 {
4244 /*
4245 * Reported LSNs shouldn't normally go backwards, but it's
4246 * possible when there is a timeline change. Treat as not
4247 * found.
4248 */
4249 return -1;
4250 }
4251
4252 Assert(prev.lsn < next.lsn);
4253
4254 if (prev.time > next.time)
4255 {
4256 /* If the clock somehow went backwards, treat as not found. */
4257 return -1;
4258 }
4259
4260 /* See how far we are between the previous and next samples. */
4261 fraction =
4262 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4263
4264 /* Scale the local flush time proportionally. */
4265 time = (TimestampTz)
4266 ((double) prev.time + (next.time - prev.time) * fraction);
4267 }
4268 else
4269 {
4270 /*
4271 * We have only a future sample, implying that we were entirely
4272 * caught up but and now there is a new burst of WAL and the
4273 * standby hasn't processed the first sample yet. Until the
4274 * standby reaches the future sample the best we can do is report
4275 * the hypothetical lag if that sample were to be replayed now.
4276 */
4278 }
4279 }
4280
4281 /* Return the elapsed time since local flush time in microseconds. */
4282 Assert(time != 0);
4283 return now - time;
4284}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
static int32 next
Definition: blutils.c:224
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:229
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:231
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:232
int write_head
Definition: walsender.c:230
TimestampTz time
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:218
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:223

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4124 of file walsender.c.

4125{
4126 bool buffer_full;
4127 int new_write_head;
4128 int i;
4129
4130 if (!am_walsender)
4131 return;
4132
4133 /*
4134 * If the lsn hasn't advanced since last time, then do nothing. This way
4135 * we only record a new sample when new WAL has been written.
4136 */
4137 if (lag_tracker->last_lsn == lsn)
4138 return;
4139 lag_tracker->last_lsn = lsn;
4140
4141 /*
4142 * If advancing the write head of the circular buffer would crash into any
4143 * of the read heads, then the buffer is full. In other words, the
4144 * slowest reader (presumably apply) is the one that controls the release
4145 * of space.
4146 */
4147 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4148 buffer_full = false;
4149 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4150 {
4151 if (new_write_head == lag_tracker->read_heads[i])
4152 buffer_full = true;
4153 }
4154
4155 /*
4156 * If the buffer is full, for now we just rewind by one slot and overwrite
4157 * the last sample, as a simple (if somewhat uneven) way to lower the
4158 * sampling rate. There may be better adaptive compaction algorithms.
4159 */
4160 if (buffer_full)
4161 {
4162 new_write_head = lag_tracker->write_head;
4163 if (lag_tracker->write_head > 0)
4165 else
4167 }
4168
4169 /* Store a sample at the current write head position. */
4171 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4172 lag_tracker->write_head = new_write_head;
4173}
XLogRecPtr last_lsn
Definition: walsender.c:228
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1027 of file walsender.c.

1029{
1030 XLogRecPtr flushptr;
1031 int count;
1032 WALReadError errinfo;
1033 XLogSegNo segno;
1034 TimeLineID currTLI;
1035
1036 /*
1037 * Make sure we have enough WAL available before retrieving the current
1038 * timeline.
1039 */
1040 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1041
1042 /* Fail if not enough (implies we are going to shut down) */
1043 if (flushptr < targetPagePtr + reqLen)
1044 return -1;
1045
1046 /*
1047 * Since logical decoding is also permitted on a standby server, we need
1048 * to check if the server is in recovery to decide how to get the current
1049 * timeline ID (so that it also covers the promotion or timeline change
1050 * cases). We must determine am_cascading_walsender after waiting for the
1051 * required WAL so that it is correct when the walsender wakes up after a
1052 * promotion.
1053 */
1055
1057 GetXLogReplayRecPtr(&currTLI);
1058 else
1059 currTLI = GetWALInsertionTimeLine();
1060
1061 XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1062 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1063 sendTimeLine = state->currTLI;
1064 sendTimeLineValidUpto = state->currTLIValidUntil;
1065 sendTimeLineNextTLI = state->nextTLI;
1066
1067 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1068 count = XLOG_BLCKSZ; /* more than one block available */
1069 else
1070 count = flushptr - targetPagePtr; /* part of the page available */
1071
1072 /* now actually read the data, we know it's there */
1073 if (!WALRead(state,
1074 cur_page,
1075 targetPagePtr,
1076 count,
1077 currTLI, /* Pass the current TLI because only
1078 * WalSndSegmentOpen controls whether new TLI
1079 * is needed. */
1080 &errinfo))
1081 WALReadRaiseError(&errinfo);
1082
1083 /*
1084 * After reading into the buffer, check that what we read was valid. We do
1085 * this after reading, because even though the segment was present when we
1086 * opened it, it might get recycled or removed while we read it. The
1087 * read() succeeds in that case, but the data we tried to read might
1088 * already have been overwritten with new WAL records.
1089 */
1090 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1091 CheckXLogRemoved(segno, state->seg.ws_tli);
1092
1093 return count;
1094}
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:161
static bool sendTimeLineIsHistoric
Definition: walsender.c:163
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1799
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:162
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:164
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6708
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3866
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1504
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1739 of file walsender.c.

1740{
1741 int elevel = got_STOPPING ? ERROR : WARNING;
1742 bool failover_slot;
1743
1744 failover_slot = (replication_active && MyReplicationSlot->data.failover);
1745
1746 /*
1747 * Note that after receiving the shutdown signal, an ERROR is reported if
1748 * any slots are dropped, invalidated, or inactive. This measure is taken
1749 * to prevent the walsender from waiting indefinitely.
1750 */
1751 if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1752 {
1753 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1754 return true;
1755 }
1756
1757 *wait_event = 0;
1758 return false;
1759}
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2805

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1771 of file walsender.c.

1773{
1774 /* Check if we need to wait for WALs to be flushed to disk */
1775 if (target_lsn > flushed_lsn)
1776 {
1777 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1778 return true;
1779 }
1780
1781 /* Check if the standby slots have caught up to the flushed position */
1782 return NeedToWaitForStandbys(flushed_lsn, wait_event);
1783}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1739

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3876 of file walsender.c.

3877{
3878 Interval *result = palloc(sizeof(Interval));
3879
3880 result->month = 0;
3881 result->day = 0;
3882 result->time = offset;
3883
3884 return result;
3885}
void * palloc(Size size)
Definition: mcxt.c:1945
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool *  reserve_wal,
CRSSnapshotAction snapshot_action,
bool *  two_phase,
bool *  failover 
)
static

Definition at line 1100 of file walsender.c.

1104{
1105 ListCell *lc;
1106 bool snapshot_action_given = false;
1107 bool reserve_wal_given = false;
1108 bool two_phase_given = false;
1109 bool failover_given = false;
1110
1111 /* Parse options */
1112 foreach(lc, cmd->options)
1113 {
1114 DefElem *defel = (DefElem *) lfirst(lc);
1115
1116 if (strcmp(defel->defname, "snapshot") == 0)
1117 {
1118 char *action;
1119
1120 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1121 ereport(ERROR,
1122 (errcode(ERRCODE_SYNTAX_ERROR),
1123 errmsg("conflicting or redundant options")));
1124
1125 action = defGetString(defel);
1126 snapshot_action_given = true;
1127
1128 if (strcmp(action, "export") == 0)
1129 *snapshot_action = CRS_EXPORT_SNAPSHOT;
1130 else if (strcmp(action, "nothing") == 0)
1131 *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1132 else if (strcmp(action, "use") == 0)
1133 *snapshot_action = CRS_USE_SNAPSHOT;
1134 else
1135 ereport(ERROR,
1136 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1137 errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1138 defel->defname, action)));
1139 }
1140 else if (strcmp(defel->defname, "reserve_wal") == 0)
1141 {
1142 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1143 ereport(ERROR,
1144 (errcode(ERRCODE_SYNTAX_ERROR),
1145 errmsg("conflicting or redundant options")));
1146
1147 reserve_wal_given = true;
1148 *reserve_wal = defGetBoolean(defel);
1149 }
1150 else if (strcmp(defel->defname, "two_phase") == 0)
1151 {
1152 if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1153 ereport(ERROR,
1154 (errcode(ERRCODE_SYNTAX_ERROR),
1155 errmsg("conflicting or redundant options")));
1156 two_phase_given = true;
1157 *two_phase = defGetBoolean(defel);
1158 }
1159 else if (strcmp(defel->defname, "failover") == 0)
1160 {
1161 if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1162 ereport(ERROR,
1163 (errcode(ERRCODE_SYNTAX_ERROR),
1164 errmsg("conflicting or redundant options")));
1165 failover_given = true;
1166 *failover = defGetBoolean(defel);
1167 }
1168 else
1169 elog(ERROR, "unrecognized option: %s", defel->defname);
1170 }
1171}
char * defGetString(DefElem *def)
Definition: define.c:35
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:826
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, failover, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3892 of file walsender.c.

3893{
3894#define PG_STAT_GET_WAL_SENDERS_COLS 12
3895 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3896 SyncRepStandbyData *sync_standbys;
3897 int num_standbys;
3898 int i;
3899
3900 InitMaterializedSRF(fcinfo, 0);
3901
3902 /*
3903 * Get the currently active synchronous standbys. This could be out of
3904 * date before we're done, but we'll use the data anyway.
3905 */
3906 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3907
3908 for (i = 0; i < max_wal_senders; i++)
3909 {
3910 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3911 XLogRecPtr sent_ptr;
3913 XLogRecPtr flush;
3914 XLogRecPtr apply;
3915 TimeOffset writeLag;
3916 TimeOffset flushLag;
3917 TimeOffset applyLag;
3918 int priority;
3919 int pid;
3921 TimestampTz replyTime;
3922 bool is_sync_standby;
3924 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3925 int j;
3926
3927 /* Collect data from shared memory */
3928 SpinLockAcquire(&walsnd->mutex);
3929 if (walsnd->pid == 0)
3930 {
3931 SpinLockRelease(&walsnd->mutex);
3932 continue;
3933 }
3934 pid = walsnd->pid;
3935 sent_ptr = walsnd->sentPtr;
3936 state = walsnd->state;
3937 write = walsnd->write;
3938 flush = walsnd->flush;
3939 apply = walsnd->apply;
3940 writeLag = walsnd->writeLag;
3941 flushLag = walsnd->flushLag;
3942 applyLag = walsnd->applyLag;
3943 priority = walsnd->sync_standby_priority;
3944 replyTime = walsnd->replyTime;
3945 SpinLockRelease(&walsnd->mutex);
3946
3947 /*
3948 * Detect whether walsender is/was considered synchronous. We can
3949 * provide some protection against stale data by checking the PID
3950 * along with walsnd_index.
3951 */
3952 is_sync_standby = false;
3953 for (j = 0; j < num_standbys; j++)
3954 {
3955 if (sync_standbys[j].walsnd_index == i &&
3956 sync_standbys[j].pid == pid)
3957 {
3958 is_sync_standby = true;
3959 break;
3960 }
3961 }
3962
3963 values[0] = Int32GetDatum(pid);
3964
3965 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3966 {
3967 /*
3968 * Only superusers and roles with privileges of pg_read_all_stats
3969 * can see details. Other users only get the pid value to know
3970 * it's a walsender, but no details.
3971 */
3972 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3973 }
3974 else
3975 {
3977
3978 if (XLogRecPtrIsInvalid(sent_ptr))
3979 nulls[2] = true;
3980 values[2] = LSNGetDatum(sent_ptr);
3981
3983 nulls[3] = true;
3984 values[3] = LSNGetDatum(write);
3985
3986 if (XLogRecPtrIsInvalid(flush))
3987 nulls[4] = true;
3988 values[4] = LSNGetDatum(flush);
3989
3990 if (XLogRecPtrIsInvalid(apply))
3991 nulls[5] = true;
3992 values[5] = LSNGetDatum(apply);
3993
3994 /*
3995 * Treat a standby such as a pg_basebackup background process
3996 * which always returns an invalid flush location, as an
3997 * asynchronous standby.
3998 */
3999 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
4000
4001 if (writeLag < 0)
4002 nulls[6] = true;
4003 else
4005
4006 if (flushLag < 0)
4007 nulls[7] = true;
4008 else
4010
4011 if (applyLag < 0)
4012 nulls[8] = true;
4013 else
4015
4016 values[9] = Int32GetDatum(priority);
4017
4018 /*
4019 * More easily understood version of standby state. This is purely
4020 * informational.
4021 *
4022 * In quorum-based sync replication, the role of each standby
4023 * listed in synchronous_standby_names can be changing very
4024 * frequently. Any standbys considered as "sync" at one moment can
4025 * be switched to "potential" ones at the next moment. So, it's
4026 * basically useless to report "sync" or "potential" as their sync
4027 * states. We report just "quorum" for them.
4028 */
4029 if (priority == 0)
4030 values[10] = CStringGetTextDatum("async");
4031 else if (is_sync_standby)
4033 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4034 else
4035 values[10] = CStringGetTextDatum("potential");
4036
4037 if (replyTime == 0)
4038 nulls[11] = true;
4039 else
4040 values[11] = TimestampTzGetDatum(replyTime);
4041 }
4042
4043 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4044 values, nulls);
4045 }
4046
4047 return (Datum) 0;
4048}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
#define MemSet(start, val, len)
Definition: c.h:991
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:78
Oid GetUserId(void)
Definition: miscinit.c:520
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
TupleDesc setDesc
Definition: execnodes.h:359
Tuplestorestate * setResult
Definition: execnodes.h:358
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:754
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3876
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3857
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2370 of file walsender.c.

2371{
2372 bool changed = false;
2374
2375 Assert(lsn != InvalidXLogRecPtr);
2376 SpinLockAcquire(&slot->mutex);
2377 if (slot->data.restart_lsn != lsn)
2378 {
2379 changed = true;
2380 slot->data.restart_lsn = lsn;
2381 }
2382 SpinLockRelease(&slot->mutex);
2383
2384 if (changed)
2385 {
2389 }
2390
2391 /*
2392 * One could argue that the slot should be saved to disk now, but that'd
2393 * be energy wasted - the worst thing lost information could cause here is
2394 * to give wrong information in a statistics view - we'll just potentially
2395 * be more conservative in removing files.
2396 */
2397}
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1156
slock_t mutex
Definition: slot.h:158
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1714

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2508 of file walsender.c.

2509{
2510 bool changed = false;
2512
2513 SpinLockAcquire(&slot->mutex);
2515
2516 /*
2517 * For physical replication we don't need the interlock provided by xmin
2518 * and effective_xmin since the consequences of a missed increase are
2519 * limited to query cancellations, so set both at once.
2520 */
2521 if (!TransactionIdIsNormal(slot->data.xmin) ||
2522 !TransactionIdIsNormal(feedbackXmin) ||
2523 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2524 {
2525 changed = true;
2526 slot->data.xmin = feedbackXmin;
2527 slot->effective_xmin = feedbackXmin;
2528 }
2530 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2531 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2532 {
2533 changed = true;
2534 slot->data.catalog_xmin = feedbackCatalogXmin;
2535 slot->effective_catalog_xmin = feedbackCatalogXmin;
2536 }
2537 SpinLockRelease(&slot->mutex);
2538
2539 if (changed)
2540 {
2543 }
2544}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1100
TransactionId xmin
Definition: slot.h:89
TransactionId catalog_xmin
Definition: slot.h:97
TransactionId effective_catalog_xmin
Definition: slot.h:182
TransactionId effective_xmin
Definition: slot.h:181
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1714 of file walsender.c.

1715{
1717
1718 /*
1719 * If we are running in a standby, there is no need to wake up walsenders.
1720 * This is because we do not support syncing slots to cascading standbys,
1721 * so, there are no walsenders waiting for standbys to catch up.
1722 */
1723 if (RecoveryInProgress())
1724 return;
1725
1728}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2772
#define SlotIsPhysical(slot)
Definition: slot.h:220
ConditionVariable wal_confirm_rcv_cv

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1595 of file walsender.c.

1596{
1597 for (;;)
1598 {
1599 long sleeptime;
1600
1601 /* Check for input from the client */
1603
1604 /* die if timeout was reached */
1606
1607 /* Send keepalive if the time has come */
1609
1610 if (!pq_is_send_pending())
1611 break;
1612
1614
1615 /* Sleep until something happens or we time out */
1617 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1618
1619 /* Clear any already-pending wakeups */
1621
1623
1624 /* Process any requests or signals received recently */
1626 {
1627 ConfigReloadPending = false;
1630 }
1631
1632 /* Try to flush pending output to the client */
1633 if (pq_flush_if_writable() != 0)
1635 }
1636
1637 /* reactivate latch so WalSndLoop knows to continue */
1639}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
struct Latch * MyLatch
Definition: globals.c:64
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:445
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_SOCKET_WRITEABLE
Definition: waiteventset.h:36
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3716
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2756
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2226
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4086
static pg_noreturn void WalSndShutdown(void)
Definition: walsender.c:366
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2712

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2226 of file walsender.c.

2227{
2228 unsigned char firstchar;
2229 int maxmsglen;
2230 int r;
2231 bool received = false;
2232
2234
2235 /*
2236 * If we already received a CopyDone from the frontend, any subsequent
2237 * message is the beginning of a new command, and should be processed in
2238 * the main processing loop.
2239 */
2240 while (!streamingDoneReceiving)
2241 {
2243 r = pq_getbyte_if_available(&firstchar);
2244 if (r < 0)
2245 {
2246 /* unexpected error or EOF */
2248 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2249 errmsg("unexpected EOF on standby connection")));
2250 proc_exit(0);
2251 }
2252 if (r == 0)
2253 {
2254 /* no data available without blocking */
2255 pq_endmsgread();
2256 break;
2257 }
2258
2259 /* Validate message type and set packet size limit */
2260 switch (firstchar)
2261 {
2262 case PqMsg_CopyData:
2263 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2264 break;
2265 case PqMsg_CopyDone:
2266 case PqMsg_Terminate:
2267 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2268 break;
2269 default:
2270 ereport(FATAL,
2271 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2272 errmsg("invalid standby message type \"%c\"",
2273 firstchar)));
2274 maxmsglen = 0; /* keep compiler quiet */
2275 break;
2276 }
2277
2278 /* Read the message contents */
2280 if (pq_getmessage(&reply_message, maxmsglen))
2281 {
2283 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2284 errmsg("unexpected EOF on standby connection")));
2285 proc_exit(0);
2286 }
2287
2288 /* ... and process it */
2289 switch (firstchar)
2290 {
2291 /*
2292 * 'd' means a standby reply wrapped in a CopyData packet.
2293 */
2294 case PqMsg_CopyData:
2296 received = true;
2297 break;
2298
2299 /*
2300 * CopyDone means the standby requested to finish streaming.
2301 * Reply with CopyDone, if we had not sent that already.
2302 */
2303 case PqMsg_CopyDone:
2305 {
2306 pq_putmessage_noblock('c', NULL, 0);
2307 streamingDoneSending = true;
2308 }
2309
2311 received = true;
2312 break;
2313
2314 /*
2315 * 'X' means that the standby is closing down the socket.
2316 */
2317 case PqMsg_Terminate:
2318 proc_exit(0);
2319
2320 default:
2321 Assert(false); /* NOT REACHED */
2322 }
2323 }
2324
2325 /*
2326 * Save the last reply timestamp if we've received at least one reply.
2327 */
2328 if (received)
2329 {
2332 }
2333}
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1004
void pq_endmsgread(void)
Definition: pqcomm.c:1166
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static bool waiting_for_ping_response
Definition: walsender.c:187
static TimestampTz last_processing
Definition: walsender.c:178
static bool streamingDoneSending
Definition: walsender.c:195
static void ProcessStandbyMessage(void)
Definition: walsender.c:2339
static bool streamingDoneReceiving
Definition: walsender.c:196

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2588 of file walsender.c.

2589{
2590 TransactionId feedbackXmin;
2591 uint32 feedbackEpoch;
2592 TransactionId feedbackCatalogXmin;
2593 uint32 feedbackCatalogEpoch;
2594 TimestampTz replyTime;
2595
2596 /*
2597 * Decipher the reply message. The caller already consumed the msgtype
2598 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2599 * of this message.
2600 */
2601 replyTime = pq_getmsgint64(&reply_message);
2602 feedbackXmin = pq_getmsgint(&reply_message, 4);
2603 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2604 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2605 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2606
2608 {
2609 char *replyTimeStr;
2610
2611 /* Copy because timestamptz_to_str returns a static buffer */
2612 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2613
2614 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2615 feedbackXmin,
2616 feedbackEpoch,
2617 feedbackCatalogXmin,
2618 feedbackCatalogEpoch,
2619 replyTimeStr);
2620
2621 pfree(replyTimeStr);
2622 }
2623
2624 /*
2625 * Update shared state for this WalSender process based on reply data from
2626 * standby.
2627 */
2628 {
2629 WalSnd *walsnd = MyWalSnd;
2630
2631 SpinLockAcquire(&walsnd->mutex);
2632 walsnd->replyTime = replyTime;
2633 SpinLockRelease(&walsnd->mutex);
2634 }
2635
2636 /*
2637 * Unset WalSender's xmins if the feedback message values are invalid.
2638 * This happens when the downstream turned hot_standby_feedback off.
2639 */
2640 if (!TransactionIdIsNormal(feedbackXmin)
2641 && !TransactionIdIsNormal(feedbackCatalogXmin))
2642 {
2644 if (MyReplicationSlot != NULL)
2645 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2646 return;
2647 }
2648
2649 /*
2650 * Check that the provided xmin/epoch are sane, that is, not in the future
2651 * and not so far back as to be already wrapped around. Ignore if not.
2652 */
2653 if (TransactionIdIsNormal(feedbackXmin) &&
2654 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2655 return;
2656
2657 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2658 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2659 return;
2660
2661 /*
2662 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2663 * the xmin will be taken into account by GetSnapshotData() /
2664 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2665 * thereby prevent the generation of cleanup conflicts on the standby
2666 * server.
2667 *
2668 * There is a small window for a race condition here: although we just
2669 * checked that feedbackXmin precedes nextXid, the nextXid could have
2670 * gotten advanced between our fetching it and applying the xmin below,
2671 * perhaps far enough to make feedbackXmin wrap around. In that case the
2672 * xmin we set here would be "in the future" and have no effect. No point
2673 * in worrying about this since it's too late to save the desired data
2674 * anyway. Assuming that the standby sends us an increasing sequence of
2675 * xmins, this could only happen during the first reply cycle, else our
2676 * own xmin would prevent nextXid from advancing so far.
2677 *
2678 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2679 * is assumed atomic, and there's no real need to prevent concurrent
2680 * horizon determinations. (If we're moving our xmin forward, this is
2681 * obviously safe, and if we're moving it backwards, well, the data is at
2682 * risk already since a VACUUM could already have determined the horizon.)
2683 *
2684 * If we're using a replication slot we reserve the xmin via that,
2685 * otherwise via the walsender's PGPROC entry. We can only track the
2686 * catalog xmin separately when using a slot, so we store the least of the
2687 * two provided when not using a slot.
2688 *
2689 * XXX: It might make sense to generalize the ephemeral slot concept and
2690 * always use the slot mechanism to handle the feedback xmin.
2691 */
2692 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2693 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2694 else
2695 {
2696 if (TransactionIdIsNormal(feedbackCatalogXmin)
2697 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2698 MyProc->xmin = feedbackCatalogXmin;
2699 else
2700 MyProc->xmin = feedbackXmin;
2701 }
2702}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:2327
void pfree(void *pointer)
Definition: mcxt.c:2152
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2508
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2557

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2339 of file walsender.c.

2340{
2341 char msgtype;
2342
2343 /*
2344 * Check message type from the first byte.
2345 */
2346 msgtype = pq_getmsgbyte(&reply_message);
2347
2348 switch (msgtype)
2349 {
2350 case 'r':
2352 break;
2353
2354 case 'h':
2356 break;
2357
2358 default:
2360 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2361 errmsg("unexpected message type \"%c\"", msgtype)));
2362 proc_exit(0);
2363 }
2364}
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2588
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2403

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2403 of file walsender.c.

2404{
2405 XLogRecPtr writePtr,
2406 flushPtr,
2407 applyPtr;
2408 bool replyRequested;
2409 TimeOffset writeLag,
2410 flushLag,
2411 applyLag;
2412 bool clearLagTimes;
2414 TimestampTz replyTime;
2415
2416 static bool fullyAppliedLastTime = false;
2417
2418 /* the caller already consumed the msgtype byte */
2419 writePtr = pq_getmsgint64(&reply_message);
2420 flushPtr = pq_getmsgint64(&reply_message);
2421 applyPtr = pq_getmsgint64(&reply_message);
2422 replyTime = pq_getmsgint64(&reply_message);
2423 replyRequested = pq_getmsgbyte(&reply_message);
2424
2426 {
2427 char *replyTimeStr;
2428
2429 /* Copy because timestamptz_to_str returns a static buffer */
2430 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2431
2432 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2433 LSN_FORMAT_ARGS(writePtr),
2434 LSN_FORMAT_ARGS(flushPtr),
2435 LSN_FORMAT_ARGS(applyPtr),
2436 replyRequested ? " (reply requested)" : "",
2437 replyTimeStr);
2438
2439 pfree(replyTimeStr);
2440 }
2441
2442 /* See if we can compute the round-trip lag for these positions. */
2444 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2445 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2446 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2447
2448 /*
2449 * If the standby reports that it has fully replayed the WAL in two
2450 * consecutive reply messages, then the second such message must result
2451 * from wal_receiver_status_interval expiring on the standby. This is a
2452 * convenient time to forget the lag times measured when it last
2453 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2454 * until more WAL traffic arrives.
2455 */
2456 clearLagTimes = false;
2457 if (applyPtr == sentPtr)
2458 {
2459 if (fullyAppliedLastTime)
2460 clearLagTimes = true;
2461 fullyAppliedLastTime = true;
2462 }
2463 else
2464 fullyAppliedLastTime = false;
2465
2466 /* Send a reply if the standby requested one. */
2467 if (replyRequested)
2469
2470 /*
2471 * Update shared state for this WalSender process based on reply data from
2472 * standby.
2473 */
2474 {
2475 WalSnd *walsnd = MyWalSnd;
2476
2477 SpinLockAcquire(&walsnd->mutex);
2478 walsnd->write = writePtr;
2479 walsnd->flush = flushPtr;
2480 walsnd->apply = applyPtr;
2481 if (writeLag != -1 || clearLagTimes)
2482 walsnd->writeLag = writeLag;
2483 if (flushLag != -1 || clearLagTimes)
2484 walsnd->flushLag = flushLag;
2485 if (applyLag != -1 || clearLagTimes)
2486 walsnd->applyLag = applyLag;
2487 walsnd->replyTime = replyTime;
2488 SpinLockRelease(&walsnd->mutex);
2489 }
2490
2493
2494 /*
2495 * Advance our local xmin horizon when the client confirmed a flush.
2496 */
2497 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2498 {
2501 else
2503 }
2504}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1818
#define SlotIsLogical(slot)
Definition: slot.h:221
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:170
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2370
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4063
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4189

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 464 of file walsender.c.

465{
466#define READ_REPLICATION_SLOT_COLS 3
467 ReplicationSlot *slot;
469 TupOutputState *tstate;
470 TupleDesc tupdesc;
472 bool nulls[READ_REPLICATION_SLOT_COLS];
473
475 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
476 TEXTOID, -1, 0);
477 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
478 TEXTOID, -1, 0);
479 /* TimeLineID is unsigned, so int4 is not wide enough. */
480 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
481 INT8OID, -1, 0);
482
483 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
484
485 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486 slot = SearchNamedReplicationSlot(cmd->slotname, false);
487 if (slot == NULL || !slot->in_use)
488 {
489 LWLockRelease(ReplicationSlotControlLock);
490 }
491 else
492 {
493 ReplicationSlot slot_contents;
494 int i = 0;
495
496 /* Copy slot contents while holding spinlock */
497 SpinLockAcquire(&slot->mutex);
498 slot_contents = *slot;
499 SpinLockRelease(&slot->mutex);
500 LWLockRelease(ReplicationSlotControlLock);
501
502 if (OidIsValid(slot_contents.data.database))
504 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505 errmsg("cannot use %s with a logical replication slot",
506 "READ_REPLICATION_SLOT"));
507
508 /* slot type */
509 values[i] = CStringGetTextDatum("physical");
510 nulls[i] = false;
511 i++;
512
513 /* start LSN */
514 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
515 {
516 char xloc[64];
517
518 snprintf(xloc, sizeof(xloc), "%X/%X",
519 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521 nulls[i] = false;
522 }
523 i++;
524
525 /* timeline this WAL was produced on */
526 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
527 {
528 TimeLineID slots_position_timeline;
529 TimeLineID current_timeline;
530 List *timeline_history = NIL;
531
532 /*
533 * While in recovery, use as timeline the currently-replaying one
534 * to get the LSN position's history.
535 */
536 if (RecoveryInProgress())
537 (void) GetXLogReplayRecPtr(&current_timeline);
538 else
539 current_timeline = GetWALInsertionTimeLine();
540
541 timeline_history = readTimeLineHistory(current_timeline);
542 slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
543 timeline_history);
544 values[i] = Int64GetDatum((int64) slots_position_timeline);
545 nulls[i] = false;
546 }
547 i++;
548
550 }
551
553 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
554 do_tup_output(tstate, values, nulls);
555 end_tup_output(tstate);
556}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
int64_t int64
Definition: c.h:499
#define OidIsValid(objectId)
Definition: c.h:746
@ LW_SHARED
Definition: lwlock.h:115
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:479
Definition: pg_list.h:54
bool in_use
Definition: slot.h:161
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 563 of file walsender.c.

564{
566 TupleDesc tupdesc;
568 char histfname[MAXFNAMELEN];
569 char path[MAXPGPATH];
570 int fd;
571 off_t histfilelen;
572 off_t bytesleft;
573 Size len;
574
576
577 /*
578 * Reply with a result set with one row, and two columns. The first col is
579 * the name of the history file, 2nd is the contents.
580 */
581 tupdesc = CreateTemplateTupleDesc(2);
582 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
583 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
584
585 TLHistoryFileName(histfname, cmd->timeline);
586 TLHistoryFilePath(path, cmd->timeline);
587
588 /* Send a RowDescription message */
589 dest->rStartup(dest, CMD_SELECT, tupdesc);
590
591 /* Send a DataRow message */
593 pq_sendint16(&buf, 2); /* # of columns */
594 len = strlen(histfname);
595 pq_sendint32(&buf, len); /* col1 len */
596 pq_sendbytes(&buf, histfname, len);
597
598 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
599 if (fd < 0)
602 errmsg("could not open file \"%s\": %m", path)));
603
604 /* Determine file length and send it to client */
605 histfilelen = lseek(fd, 0, SEEK_END);
606 if (histfilelen < 0)
609 errmsg("could not seek to end of file \"%s\": %m", path)));
610 if (lseek(fd, 0, SEEK_SET) != 0)
613 errmsg("could not seek to beginning of file \"%s\": %m", path)));
614
615 pq_sendint32(&buf, histfilelen); /* col2 len */
616
617 bytesleft = histfilelen;
618 while (bytesleft > 0)
619 {
620 PGAlignedBlock rbuf;
621 int nread;
622
623 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
624 nread = read(fd, rbuf.data, sizeof(rbuf));
626 if (nread < 0)
629 errmsg("could not read file \"%s\": %m",
630 path)));
631 else if (nread == 0)
634 errmsg("could not read file \"%s\": read %d of %zu",
635 path, nread, (Size) bytesleft)));
636
637 pq_sendbytes(&buf, rbuf.data, nread);
638 bytesleft -= nread;
639 }
640
641 if (CloseTransientFile(fd) != 0)
644 errmsg("could not close file \"%s\": %m", path)));
645
647}
#define PG_BINARY
Definition: c.h:1244
size_t Size
Definition: c.h:576
int errcode_for_file_access(void)
Definition: elog.c:877
int CloseTransientFile(int fd)
Definition: fd.c:2871
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:271
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1090
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1433 of file walsender.c.

1434{
1436 QueryCompletion qc;
1437
1438 /* make sure that our requirements are still fulfilled */
1440
1442
1443 ReplicationSlotAcquire(cmd->slotname, true, true);
1444
1445 /*
1446 * Force a disconnect, so that the decoding code doesn't need to care
1447 * about an eventual switch from running in recovery, to running in a
1448 * normal environment. Client code is expected to handle reconnects.
1449 */
1451 {
1452 ereport(LOG,
1453 (errmsg("terminating walsender process after promotion")));
1454 got_STOPPING = true;
1455 }
1456
1457 /*
1458 * Create our decoding context, making it start at the previously ack'ed
1459 * position.
1460 *
1461 * Do this before sending a CopyBothResponse message, so that any errors
1462 * are reported early.
1463 */
1465 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1467 .segment_open = WalSndSegmentOpen,
1468 .segment_close = wal_segment_close),
1472
1474
1475 /* Send a CopyBothResponse message, and start streaming */
1477 pq_sendbyte(&buf, 0);
1478 pq_sendint16(&buf, 0);
1480 pq_flush();
1481
1482 /* Start reading WAL from the oldest required WAL. */
1485
1486 /*
1487 * Report the location after which we'll send out further commits as the
1488 * current sentPtr.
1489 */
1491
1492 /* Also update the sent position status in shared memory */
1496
1497 replication_active = true;
1498
1500
1501 /* Main loop of walsender */
1503
1506
1507 replication_active = false;
1508 if (got_STOPPING)
1509 proc_exit(0);
1511
1512 /* Get out of COPY mode (CommandComplete). */
1513 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1514 EndCommand(&qc, DestRemote, false);
1515}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:559
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2783
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:213
static void XLogSendLogical(void)
Definition: walsender.c:3405
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:232

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 795 of file walsender.c.

796{
798 XLogRecPtr FlushPtr;
799 TimeLineID FlushTLI;
800
801 /* create xlogreader for physical replication */
802 xlogreader =
804 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
805 .segment_close = wal_segment_close),
806 NULL);
807
808 if (!xlogreader)
810 (errcode(ERRCODE_OUT_OF_MEMORY),
811 errmsg("out of memory"),
812 errdetail("Failed while allocating a WAL reading processor.")));
813
814 /*
815 * We assume here that we're logging enough information in the WAL for
816 * log-shipping, since this is checked in PostmasterMain().
817 *
818 * NOTE: wal_level can only change at shutdown, so in most cases it is
819 * difficult for there to be WAL data that we can still see that was
820 * written at wal_level='minimal'.
821 */
822
823 if (cmd->slotname)
824 {
825 ReplicationSlotAcquire(cmd->slotname, true, true);
828 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
829 errmsg("cannot use a logical replication slot for physical replication")));
830
831 /*
832 * We don't need to verify the slot's restart_lsn here; instead we
833 * rely on the caller requesting the starting point to use. If the
834 * WAL segment doesn't exist, we'll fail later.
835 */
836 }
837
838 /*
839 * Select the timeline. If it was given explicitly by the client, use
840 * that. Otherwise use the timeline of the last replayed record.
841 */
844 FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
845 else
846 FlushPtr = GetFlushRecPtr(&FlushTLI);
847
848 if (cmd->timeline != 0)
849 {
850 XLogRecPtr switchpoint;
851
852 sendTimeLine = cmd->timeline;
853 if (sendTimeLine == FlushTLI)
854 {
857 }
858 else
859 {
860 List *timeLineHistory;
861
863
864 /*
865 * Check that the timeline the client requested exists, and the
866 * requested start location is on that timeline.
867 */
868 timeLineHistory = readTimeLineHistory(FlushTLI);
869 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
871 list_free_deep(timeLineHistory);
872
873 /*
874 * Found the requested timeline in the history. Check that
875 * requested startpoint is on that timeline in our history.
876 *
877 * This is quite loose on purpose. We only check that we didn't
878 * fork off the requested timeline before the switchpoint. We
879 * don't check that we switched *to* it before the requested
880 * starting point. This is because the client can legitimately
881 * request to start replication from the beginning of the WAL
882 * segment that contains switchpoint, but on the new timeline, so
883 * that it doesn't end up with a partial segment. If you ask for
884 * too old a starting point, you'll get an error later when we
885 * fail to find the requested WAL segment in pg_wal.
886 *
887 * XXX: we could be more strict here and only allow a startpoint
888 * that's older than the switchpoint, if it's still in the same
889 * WAL segment.
890 */
891 if (!XLogRecPtrIsInvalid(switchpoint) &&
892 switchpoint < cmd->startpoint)
893 {
895 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
897 cmd->timeline),
898 errdetail("This server's history forked from timeline %u at %X/%X.",
899 cmd->timeline,
900 LSN_FORMAT_ARGS(switchpoint))));
901 }
902 sendTimeLineValidUpto = switchpoint;
903 }
904 }
905 else
906 {
907 sendTimeLine = FlushTLI;
910 }
911
913
914 /* If there is nothing to stream, don't even enter COPY mode */
916 {
917 /*
918 * When we first start replication the standby will be behind the
919 * primary. For some applications, for example synchronous
920 * replication, it is important to have a clear state for this initial
921 * catchup mode, so we can trigger actions when we change streaming
922 * state later. We may stay in this state for a long time, which is
923 * exactly why we want to be able to monitor whether or not we are
924 * still here.
925 */
927
928 /* Send a CopyBothResponse message, and start streaming */
930 pq_sendbyte(&buf, 0);
931 pq_sendint16(&buf, 0);
933 pq_flush();
934
935 /*
936 * Don't allow a request to stream from a future point in WAL that
937 * hasn't been flushed to disk in this server yet.
938 */
939 if (FlushPtr < cmd->startpoint)
940 {
942 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
944 LSN_FORMAT_ARGS(FlushPtr))));
945 }
946
947 /* Start streaming from the requested point */
948 sentPtr = cmd->startpoint;
949
950 /* Initialize shared memory status, too */
954
956
957 /* Main loop of walsender */
958 replication_active = true;
959
961
962 replication_active = false;
963 if (got_STOPPING)
964 proc_exit(0);
966
968 }
969
970 if (cmd->slotname)
972
973 /*
974 * Copy is finished now. Send a single-row result set indicating the next
975 * timeline.
976 */
978 {
979 char startpos_str[8 + 1 + 8 + 1];
981 TupOutputState *tstate;
982 TupleDesc tupdesc;
983 Datum values[2];
984 bool nulls[2] = {0};
985
986 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
988
990
991 /*
992 * Need a tuple descriptor representing two columns. int8 may seem
993 * like a surprising data type for this, but in theory int4 would not
994 * be wide enough for this, as TimeLineID is unsigned.
995 */
996 tupdesc = CreateTemplateTupleDesc(2);
997 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
998 INT8OID, -1, 0);
999 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1000 TEXTOID, -1, 0);
1001
1002 /* prepare for projection of tuple */
1003 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1004
1006 values[1] = CStringGetTextDatum(startpos_str);
1007
1008 /* send it to dest */
1009 do_tup_output(tstate, values, nulls);
1010
1011 end_tup_output(tstate);
1012 }
1013
1014 /* Send CommandComplete message */
1015 EndReplicationCommand("START_STREAMING");
1016}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1204
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3095
int wal_segment_size
Definition: xlog.c:143
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:107

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2557 of file walsender.c.

2558{
2559 FullTransactionId nextFullXid;
2560 TransactionId nextXid;
2561 uint32 nextEpoch;
2562
2563 nextFullXid = ReadNextFullTransactionId();
2564 nextXid = XidFromFullTransactionId(nextFullXid);
2565 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2566
2567 if (xid <= nextXid)
2568 {
2569 if (epoch != nextEpoch)
2570 return false;
2571 }
2572 else
2573 {
2574 if (epoch + 1 != nextEpoch)
2575 return false;
2576 }
2577
2578 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2579 return false; /* epoch OK, but it's wrapped around */
2580
2581 return true;
2582}
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 653 of file walsender.c.

654{
655 MemoryContext mcxt;
657 off_t offset = 0;
659
660 /*
661 * parsing the manifest will use the cryptohash stuff, which requires a
662 * resource owner
663 */
666 CurrentResourceOwner == NULL);
668
669 /* Prepare to read manifest data into a temporary context. */
671 "incremental backup information",
674
675 /* Send a CopyInResponse message */
677 pq_sendbyte(&buf, 0);
678 pq_sendint16(&buf, 0);
680 pq_flush();
681
682 /* Receive packets from client until done. */
683 while (HandleUploadManifestPacket(&buf, &offset, ib))
684 ;
685
686 /* Finish up manifest processing. */
688
689 /*
690 * Discard any old manifest information and arrange to preserve the new
691 * information we just got.
692 *
693 * We assume that MemoryContextDelete and MemoryContextSetParent won't
694 * fail, and thus we shouldn't end up bailing out of here in such a way as
695 * to leave dangling pointers.
696 */
697 if (uploaded_manifest_mcxt != NULL)
702
703 /* clean up the resource owner we created */
705}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:668
MemoryContext CacheMemoryContext
Definition: mcxt.c:168
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
#define PqMsg_CopyInResponse
Definition: protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1019
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition: resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:719
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:153

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2756 of file walsender.c.

2757{
2758 TimestampTz timeout;
2759
2760 /* don't bail out if we're doing something that doesn't require timeouts */
2761 if (last_reply_timestamp <= 0)
2762 return;
2763
2766
2767 if (wal_sender_timeout > 0 && last_processing >= timeout)
2768 {
2769 /*
2770 * Since typically expiration of replication timeout means
2771 * communication problem, we don't send the error message to the
2772 * standby.
2773 */
2775 (errmsg("terminating walsender process due to replication timeout")));
2776
2778 }
2779}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:128

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2712 of file walsender.c.

2713{
2714 long sleeptime = 10000; /* 10 s */
2715
2717 {
2718 TimestampTz wakeup_time;
2719
2720 /*
2721 * At the latest stop sleeping once wal_sender_timeout has been
2722 * reached.
2723 */
2726
2727 /*
2728 * If no ping has been sent yet, wakeup when it's time to do so.
2729 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2730 * the timeout passed without a response.
2731 */
2734 wal_sender_timeout / 2);
2735
2736 /* Compute relative time until wakeup. */
2737 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2738 }
2739
2740 return sleeptime;
2741}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3490 of file walsender.c.

3491{
3492 XLogRecPtr replicatedPtr;
3493
3494 /* ... let's just be real sure we're caught up ... */
3495 send_data();
3496
3497 /*
3498 * To figure out whether all WAL has successfully been replicated, check
3499 * flush location if valid, write otherwise. Tools like pg_receivewal will
3500 * usually (unless in synchronous mode) return an invalid flush location.
3501 */
3502 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3504
3505 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3507 {
3508 QueryCompletion qc;
3509
3510 /* Inform the standby that XLOG streaming is done */
3511 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3512 EndCommand(&qc, DestRemote, false);
3513 pq_flush();
3514
3515 proc_exit(0);
3516 }
3519}
static bool WalSndCaughtUp
Definition: walsender.c:199

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 330 of file walsender.c.

331{
336
337 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
339
340 if (MyReplicationSlot != NULL)
342
344
345 replication_active = false;
346
347 /*
348 * If there is a transaction in progress, it will clean up our
349 * ResourceOwner, but if a replication command set up a resource owner
350 * without a transaction, we've got to clean that up now.
351 */
354
356 proc_exit(0);
357
358 /* Revert back to startup state */
360}
void pgaio_error_cleanup(void)
Definition: aio.c:1064
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1953
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:202
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3857 of file walsender.c.

3858{
3859 switch (state)
3860 {
3862 return "startup";
3863 case WALSNDSTATE_BACKUP:
3864 return "backup";
3866 return "catchup";
3868 return "streaming";
3870 return "stopping";
3871 }
3872 return "UNKNOWN";
3873}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3774 of file walsender.c.

3775{
3776 int i;
3777
3778 for (i = 0; i < max_wal_senders; i++)
3779 {
3780 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3781 pid_t pid;
3782
3783 SpinLockAcquire(&walsnd->mutex);
3784 pid = walsnd->pid;
3785 SpinLockRelease(&walsnd->mutex);
3786
3787 if (pid == 0)
3788 continue;
3789
3791 }
3792}
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4063 of file walsender.c.

4064{
4065 elog(DEBUG2, "sending replication keepalive");
4066
4067 /* construct the message... */
4070 pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
4072 pq_sendbyte(&output_message, requestReply ? 1 : 0);
4073
4074 /* ... and send it wrapped in CopyData */
4076
4077 /* Set local flag */
4078 if (requestReply)
4080}
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4086 of file walsender.c.

4087{
4088 TimestampTz ping_time;
4089
4090 /*
4091 * Don't send keepalive messages if timeouts are globally disabled or
4092 * we're doing something not partaking in timeouts.
4093 */
4095 return;
4096
4098 return;
4099
4100 /*
4101 * If half of wal_sender_timeout has lapsed without receiving any reply
4102 * from the standby, send a keep-alive message to the standby requesting
4103 * an immediate reply.
4104 */
4106 wal_sender_timeout / 2);
4107 if (last_processing >= ping_time)
4108 {
4110
4111 /* Try to flush pending output to the client */
4112 if (pq_flush_if_writable() != 0)
4114 }
4115}

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3001 of file walsender.c.

3002{
3003 WalSnd *walsnd = MyWalSnd;
3004
3005 Assert(walsnd != NULL);
3006
3007 MyWalSnd = NULL;
3008
3009 SpinLockAcquire(&walsnd->mutex);
3010 /* Mark WalSnd struct as no longer being in use. */
3011 walsnd->pid = 0;
3012 SpinLockRelease(&walsnd->mutex);
3013}

References Assert(), WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3611 of file walsender.c.

3612{
3613 got_SIGUSR2 = true;
3615}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2783 of file walsender.c.

2784{
2785 TimestampTz last_flush = 0;
2786
2787 /*
2788 * Initialize the last reply timestamp. That enables timeout processing
2789 * from hereon.
2790 */
2793
2794 /*
2795 * Loop until we reach the end of this timeline or the client requests to
2796 * stop streaming.
2797 */
2798 for (;;)
2799 {
2800 /* Clear any already-pending wakeups */
2802
2804
2805 /* Process any requests or signals received recently */
2807 {
2808 ConfigReloadPending = false;
2811 }
2812
2813 /* Check for input from the client */
2815
2816 /*
2817 * If we have received CopyDone from the client, sent CopyDone
2818 * ourselves, and the output buffer is empty, it's time to exit
2819 * streaming.
2820 */
2823 break;
2824
2825 /*
2826 * If we don't have any pending data in the output buffer, try to send
2827 * some more. If there is some, we don't bother to call send_data
2828 * again until we've flushed it ... but we'd better assume we are not
2829 * caught up.
2830 */
2831 if (!pq_is_send_pending())
2832 send_data();
2833 else
2834 WalSndCaughtUp = false;
2835
2836 /* Try to flush pending output to the client */
2837 if (pq_flush_if_writable() != 0)
2839
2840 /* If nothing remains to be sent right now ... */
2842 {
2843 /*
2844 * If we're in catchup state, move to streaming. This is an
2845 * important state change for users to know about, since before
2846 * this point data loss might occur if the primary dies and we
2847 * need to failover to the standby. The state change is also
2848 * important for synchronous replication, since commits that
2849 * started to wait at that point might wait for some time.
2850 */
2852 {
2854 (errmsg_internal("\"%s\" has now caught up with upstream server",
2857 }
2858
2859 /*
2860 * When SIGUSR2 arrives, we send any outstanding logs up to the
2861 * shutdown checkpoint record (i.e., the latest record), wait for
2862 * them to be replicated to the standby, and exit. This may be a
2863 * normal termination at shutdown, or a promotion, the walsender
2864 * is not sure which.
2865 */
2866 if (got_SIGUSR2)
2867 WalSndDone(send_data);
2868 }
2869
2870 /* Check for replication timeout. */
2872
2873 /* Send keepalive if the time has come */
2875
2876 /*
2877 * Block if we have unsent data. XXX For logical replication, let
2878 * WalSndWaitForWal() handle any other blocking; idle receivers need
2879 * its additional actions. For physical replication, also block if
2880 * caught up; its send_data does not block.
2881 *
2882 * The IO statistics are reported in WalSndWaitForWal() for the
2883 * logical WAL senders.
2884 */
2885 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2888 {
2889 long sleeptime;
2890 int wakeEvents;
2892
2894 wakeEvents = WL_SOCKET_READABLE;
2895 else
2896 wakeEvents = 0;
2897
2898 /*
2899 * Use fresh timestamp, not last_processing, to reduce the chance
2900 * of reaching wal_sender_timeout before sending a keepalive.
2901 */
2903 sleeptime = WalSndComputeSleeptime(now);
2904
2905 if (pq_is_send_pending())
2906 wakeEvents |= WL_SOCKET_WRITEABLE;
2907
2908 /* Report IO statistics, if needed */
2909 if (TimestampDifferenceExceeds(last_flush, now,
2911 {
2912 pgstat_flush_io(false);
2914 last_flush = now;
2915 }
2916
2917 /* Sleep until something happens or we time out */
2918 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2919 }
2920 }
2921}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
char * application_name
Definition: guc_tables.c:560
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition: pgstat_io.c:183
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition: walsender.c:100
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3490

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1526 of file walsender.c.

1527{
1528 /* can't have sync rep confused by sending the same LSN several times */
1529 if (!last_write)
1530 lsn = InvalidXLogRecPtr;
1531
1532 resetStringInfo(ctx->out);
1533
1534 pq_sendbyte(ctx->out, 'w');
1535 pq_sendint64(ctx->out, lsn); /* dataStart */
1536 pq_sendint64(ctx->out, lsn); /* walEnd */
1537
1538 /*
1539 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1540 * reserve space here.
1541 */
1542 pq_sendint64(ctx->out, 0); /* sendtime */
1543}
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3566 of file walsender.c.

3567{
3568 int i;
3569
3570 for (i = 0; i < max_wal_senders; i++)
3571 {
3572 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3573
3574 SpinLockAcquire(&walsnd->mutex);
3575 if (walsnd->pid == 0)
3576 {
3577 SpinLockRelease(&walsnd->mutex);
3578 continue;
3579 }
3580 walsnd->needreload = true;
3581 SpinLockRelease(&walsnd->mutex);
3582 }
3583}

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3017 of file walsender.c.

3019{
3020 char path[MAXPGPATH];
3021
3022 /*-------
3023 * When reading from a historic timeline, and there is a timeline switch
3024 * within this segment, read from the WAL segment belonging to the new
3025 * timeline.
3026 *
3027 * For example, imagine that this server is currently on timeline 5, and
3028 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3029 * 0/13002088. In pg_wal, we have these files:
3030 *
3031 * ...
3032 * 000000040000000000000012
3033 * 000000040000000000000013
3034 * 000000050000000000000013
3035 * 000000050000000000000014
3036 * ...
3037 *
3038 * In this situation, when requested to send the WAL from segment 0x13, on
3039 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3040 * recovery prefers files from newer timelines, so if the segment was
3041 * restored from the archive on this server, the file belonging to the old
3042 * timeline, 000000040000000000000013, might not exist. Their contents are
3043 * equal up to the switchpoint, because at a timeline switch, the used
3044 * portion of the old segment is copied to the new file.
3045 */
3046 *tli_p = sendTimeLine;
3048 {
3049 XLogSegNo endSegNo;
3050
3051 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3052 if (nextSegNo == endSegNo)
3053 *tli_p = sendTimeLineNextTLI;
3054 }
3055
3056 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3057 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3058 if (state->seg.ws_file >= 0)
3059 return;
3060
3061 /*
3062 * If the file is not found, assume it's because the standby asked for a
3063 * too old WAL segment that has already been removed or recycled.
3064 */
3065 if (errno == ENOENT)
3066 {
3067 char xlogfname[MAXFNAMELEN];
3068 int save_errno = errno;
3069
3070 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3071 errno = save_errno;
3072 ereport(ERROR,
3074 errmsg("requested WAL segment %s has already been removed",
3075 xlogfname)));
3076 }
3077 else
3078 ereport(ERROR,
3080 errmsg("could not open file \"%s\": %m",
3081 path)));
3082}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1089
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3838 of file walsender.c.

3839{
3840 WalSnd *walsnd = MyWalSnd;
3841
3843
3844 if (walsnd->state == state)
3845 return;
3846
3847 SpinLockAcquire(&walsnd->mutex);
3848 walsnd->state = state;
3849 SpinLockRelease(&walsnd->mutex);
3850}

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3650 of file walsender.c.

3651{
3652 bool found;
3653 int i;
3654
3656 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3657
3658 if (!found)
3659 {
3660 /* First time through, so initialize */
3662
3663 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3665
3666 for (i = 0; i < max_wal_senders; i++)
3667 {
3668 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3669
3670 SpinLockInit(&walsnd->mutex);
3671 }
3672
3676 }
3677}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3638

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3638 of file walsender.c.

3639{
3640 Size size = 0;
3641
3642 size = offsetof(WalSndCtlData, walsnds);
3643 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3644
3645 return size;
3646}
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 366 of file walsender.c.

367{
368 /*
369 * Reset whereToSendOutput to prevent ereport from attempting to send any
370 * more messages to the standby.
371 */
374
375 proc_exit(0);
376 abort(); /* keep the compiler quiet */
377}
@ DestNone
Definition: dest.h:87
CommandDest whereToSendOutput
Definition: postgres.c:91

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3619 of file walsender.c.

3620{
3621 /* Set up signal handlers */
3623 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3624 pqsignal(SIGTERM, die); /* request shutdown */
3625 /* SIGQUIT handler was already set up by InitPostmasterChild */
3626 InitializeTimeouts(); /* establishes SIGALRM handler */
3627 pqsignal(SIGPIPE, SIG_IGN);
3629 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3630 * shutdown */
3631
3632 /* Reset some signals that are accepted by postmaster but not here */
3633 pqsignal(SIGCHLD, SIG_DFL);
3634}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
#define die(msg)
#define pqsignal
Definition: port.h:531
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3058
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3611
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1649 of file walsender.c.

1651{
1652 static TimestampTz sendTime = 0;
1654 bool pending_writes = false;
1655 bool end_xact = ctx->end_xact;
1656
1657 /*
1658 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1659 * avoid flooding the lag tracker when we commit frequently.
1660 *
1661 * We don't have a mechanism to get the ack for any LSN other than end
1662 * xact LSN from the downstream. So, we track lag only for end of
1663 * transaction LSN.
1664 */
1665#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1666 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1668 {
1669 LagTrackerWrite(lsn, now);
1670 sendTime = now;
1671 }
1672
1673 /*
1674 * When skipping empty transactions in synchronous replication, we send a
1675 * keepalive message to avoid delaying such transactions.
1676 *
1677 * It is okay to check sync_standbys_status without lock here as in the
1678 * worst case we will just send an extra keepalive message when it is
1679 * really not required.
1680 */
1681 if (skipped_xact &&
1682 SyncRepRequested() &&
1683 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1684 {
1685 WalSndKeepalive(false, lsn);
1686
1687 /* Try to flush pending output to the client */
1688 if (pq_flush_if_writable() != 0)
1690
1691 /* If we have pending write here, make sure it's actually flushed */
1692 if (pq_is_send_pending())
1693 pending_writes = true;
1694 }
1695
1696 /*
1697 * Process pending writes if any or try to send a keepalive if required.
1698 * We don't need to try sending keep alive messages at the transaction end
1699 * as that will be done at a later point in time. This is required only
1700 * for large transactions where we don't send any changes to the
1701 * downstream and the receiver can timeout due to that.
1702 */
1703 if (pending_writes || (!end_xact &&
1705 wal_sender_timeout / 2)))
1707}
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1595
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4124
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3716 of file walsender.c.

3717{
3718 WaitEvent event;
3719
3720 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3721
3722 /*
3723 * We use a condition variable to efficiently wake up walsenders in
3724 * WalSndWakeup().
3725 *
3726 * Every walsender prepares to sleep on a shared memory CV. Note that it
3727 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3728 * waitlist), but does not actually wait on the CV (IOW, it never calls
3729 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3730 * waiting, because we also need to wait for socket events. The processes
3731 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3732 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3733 * walsenders come out of WaitEventSetWait().
3734 *
3735 * This approach is simple and efficient because, one doesn't have to loop
3736 * through all the walsenders slots, with a spinlock acquisition and
3737 * release for every iteration, just to wake up only the waiting
3738 * walsenders. It makes WalSndWakeup() callers' life easy.
3739 *
3740 * XXX: A desirable future improvement would be to add support for CVs
3741 * into WaitEventSetWait().
3742 *
3743 * And, we use separate shared memory CVs for physical and logical
3744 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3745 *
3746 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3747 * until awakened by physical walsenders after the walreceiver confirms
3748 * the receipt of the LSN.
3749 */
3750 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3756
3757 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3758 (event.events & WL_POSTMASTER_DEATH))
3759 {
3761 proc_exit(1);
3762 }
3763
3765}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: waiteventset.h:62
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: waiteventset.c:655
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1799 of file walsender.c.

1800{
1801 int wakeEvents;
1802 uint32 wait_event = 0;
1803 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1804 TimestampTz last_flush = 0;
1805
1806 /*
1807 * Fast path to avoid acquiring the spinlock in case we already know we
1808 * have enough WAL available and all the standby servers have confirmed
1809 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1810 * if we're far behind.
1811 */
1812 if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
1813 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1814 return RecentFlushPtr;
1815
1816 /*
1817 * Within the loop, we wait for the necessary WALs to be flushed to disk
1818 * first, followed by waiting for standbys to catch up if there are enough
1819 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1820 */
1821 for (;;)
1822 {
1823 bool wait_for_standby_at_stop = false;
1824 long sleeptime;
1826
1827 /* Clear any already-pending wakeups */
1829
1831
1832 /* Process any requests or signals received recently */
1834 {
1835 ConfigReloadPending = false;
1838 }
1839
1840 /* Check for input from the client */
1842
1843 /*
1844 * If we're shutting down, trigger pending WAL to be written out,
1845 * otherwise we'd possibly end up waiting for WAL that never gets
1846 * written, because walwriter has shut down already.
1847 */
1848 if (got_STOPPING)
1850
1851 /*
1852 * To avoid the scenario where standbys need to catch up to a newer
1853 * WAL location in each iteration, we update our idea of the currently
1854 * flushed position only if we are not waiting for standbys to catch
1855 * up.
1856 */
1857 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1858 {
1859 if (!RecoveryInProgress())
1860 RecentFlushPtr = GetFlushRecPtr(NULL);
1861 else
1862 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1863 }
1864
1865 /*
1866 * If postmaster asked us to stop and the standby slots have caught up
1867 * to the flushed position, don't wait anymore.
1868 *
1869 * It's important to do this check after the recomputation of
1870 * RecentFlushPtr, so we can send all remaining data before shutting
1871 * down.
1872 */
1873 if (got_STOPPING)
1874 {
1875 if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1876 wait_for_standby_at_stop = true;
1877 else
1878 break;
1879 }
1880
1881 /*
1882 * We only send regular messages to the client for full decoded
1883 * transactions, but a synchronous replication and walsender shutdown
1884 * possibly are waiting for a later location. So, before sleeping, we
1885 * send a ping containing the flush location. If the receiver is
1886 * otherwise idle, this keepalive will trigger a reply. Processing the
1887 * reply will update these MyWalSnd locations.
1888 */
1889 if (MyWalSnd->flush < sentPtr &&
1890 MyWalSnd->write < sentPtr &&
1893
1894 /*
1895 * Exit the loop if already caught up and doesn't need to wait for
1896 * standby slots.
1897 */
1898 if (!wait_for_standby_at_stop &&
1899 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1900 break;
1901
1902 /*
1903 * Waiting for new WAL or waiting for standbys to catch up. Since we
1904 * need to wait, we're now caught up.
1905 */
1906 WalSndCaughtUp = true;
1907
1908 /*
1909 * Try to flush any pending output to the client.
1910 */
1911 if (pq_flush_if_writable() != 0)
1913
1914 /*
1915 * If we have received CopyDone from the client, sent CopyDone
1916 * ourselves, and the output buffer is empty, it's time to exit
1917 * streaming, so fail the current WAL fetch request.
1918 */
1921 break;
1922
1923 /* die if timeout was reached */
1925
1926 /* Send keepalive if the time has come */
1928
1929 /*
1930 * Sleep until something happens or we time out. Also wait for the
1931 * socket becoming writable, if there's still pending output.
1932 * Otherwise we might sit on sendable output data while waiting for
1933 * new WAL to be generated. (But if we have nothing to send, we don't
1934 * want to wake on socket-writable.)
1935 */
1937 sleeptime = WalSndComputeSleeptime(now);
1938
1939 wakeEvents = WL_SOCKET_READABLE;
1940
1941 if (pq_is_send_pending())
1942 wakeEvents |= WL_SOCKET_WRITEABLE;
1943
1944 Assert(wait_event != 0);
1945
1946 /* Report IO statistics, if needed */
1947 if (TimestampDifferenceExceeds(last_flush, now,
1949 {
1950 pgstat_flush_io(false);
1952 last_flush = now;
1953 }
1954
1955 WalSndWait(wakeEvents, sleeptime, wait_event);
1956 }
1957
1958 /* reactivate latch so WalSndLoop knows to continue */
1960 return RecentFlushPtr;
1961}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1771
bool XLogBackgroundFlush(void)
Definition: xlog.c:3111

References Assert(), CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsInvalid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3800 of file walsender.c.

3801{
3802 for (;;)
3803 {
3804 int i;
3805 bool all_stopped = true;
3806
3807 for (i = 0; i < max_wal_senders; i++)
3808 {
3809 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3810
3811 SpinLockAcquire(&walsnd->mutex);
3812
3813 if (walsnd->pid == 0)
3814 {
3815 SpinLockRelease(&walsnd->mutex);
3816 continue;
3817 }
3818
3819 if (walsnd->state != WALSNDSTATE_STOPPING)
3820 {
3821 all_stopped = false;
3822 SpinLockRelease(&walsnd->mutex);
3823 break;
3824 }
3825 SpinLockRelease(&walsnd->mutex);
3826 }
3827
3828 /* safe to leave if confirmation is done for all WAL senders */
3829 if (all_stopped)
3830 return;
3831
3832 pg_usleep(10000L); /* wait for 10 msec */
3833 }
3834}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3695 of file walsender.c.

3696{
3697 /*
3698 * Wake up all the walsenders waiting on WAL being flushed or replayed
3699 * respectively. Note that waiting walsender would have prepared to sleep
3700 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3701 * before actually waiting.
3702 */
3703 if (physical)
3705
3706 if (logical)
3708}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1553 of file walsender.c.

1555{
1557
1558 /*
1559 * Fill the send timestamp last, so that it is taken as late as possible.
1560 * This is somewhat ugly, but the protocol is set as it's already used for
1561 * several releases by streaming physical replication.
1562 */
1566 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1567 tmpbuf.data, sizeof(int64));
1568
1569 /* output previously gathered data in a CopyData packet */
1570 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1571
1573
1574 /* Try to flush pending output to the client */
1575 if (pq_flush_if_writable() != 0)
1577
1578 /* Try taking fast path unless we get too close to walsender timeout. */
1580 wal_sender_timeout / 2) &&
1582 {
1583 return;
1584 }
1585
1586 /* If we have pending write here, go to slow path */
1588}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3405 of file walsender.c.

3406{
3407 XLogRecord *record;
3408 char *errm;
3409
3410 /*
3411 * We'll use the current flush point to determine whether we've caught up.
3412 * This variable is static in order to cache it across calls. Caching is
3413 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3414 * spinlock.
3415 */
3416 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3417
3418 /*
3419 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3420 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3421 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3422 * didn't wait - i.e. when we're shutting down.
3423 */
3424 WalSndCaughtUp = false;
3425
3426 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3427
3428 /* xlog record was invalid */
3429 if (errm != NULL)
3430 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3431 errm);
3432
3433 if (record != NULL)
3434 {
3435 /*
3436 * Note the lack of any call to LagTrackerWrite() which is handled by
3437 * WalSndUpdateProgress which is called by output plugin through
3438 * logical decoding write api.
3439 */
3441
3443 }
3444
3445 /*
3446 * If first time through in this session, initialize flushPtr. Otherwise,
3447 * we only need to update flushPtr if EndRecPtr is past it.
3448 */
3449 if (flushPtr == InvalidXLogRecPtr ||
3451 {
3453 flushPtr = GetStandbyFlushRecPtr(NULL);
3454 else
3455 flushPtr = GetFlushRecPtr(NULL);
3456 }
3457
3458 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3459 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3460 WalSndCaughtUp = true;
3461
3462 /*
3463 * If we're caught up and have been requested to stop, have WalSndLoop()
3464 * terminate the connection in an orderly manner, after writing out all
3465 * the pending data.
3466 */
3468 got_SIGUSR2 = true;
3469
3470 /* Update shared memory status */
3471 {
3472 WalSnd *walsnd = MyWalSnd;
3473
3474 SpinLockAcquire(&walsnd->mutex);
3475 walsnd->sentPtr = sentPtr;
3476 SpinLockRelease(&walsnd->mutex);
3477 }
3478}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3095 of file walsender.c.

3096{
3097 XLogRecPtr SendRqstPtr;
3098 XLogRecPtr startptr;
3099 XLogRecPtr endptr;
3100 Size nbytes;
3101 XLogSegNo segno;
3102 WALReadError errinfo;
3103 Size rbytes;
3104
3105 /* If requested switch the WAL sender to the stopping state. */
3106 if (got_STOPPING)
3108
3110 {
3111 WalSndCaughtUp = true;
3112 return;
3113 }
3114
3115 /* Figure out how far we can safely send the WAL. */
3117 {
3118 /*
3119 * Streaming an old timeline that's in this server's history, but is
3120 * not the one we're currently inserting or replaying. It can be
3121 * streamed up to the point where we switched off that timeline.
3122 */
3123 SendRqstPtr = sendTimeLineValidUpto;
3124 }
3125 else if (am_cascading_walsender)
3126 {
3127 TimeLineID SendRqstTLI;
3128
3129 /*
3130 * Streaming the latest timeline on a standby.
3131 *
3132 * Attempt to send all WAL that has already been replayed, so that we
3133 * know it's valid. If we're receiving WAL through streaming
3134 * replication, it's also OK to send any WAL that has been received
3135 * but not replayed.
3136 *
3137 * The timeline we're recovering from can change, or we can be
3138 * promoted. In either case, the current timeline becomes historic. We
3139 * need to detect that so that we don't try to stream past the point
3140 * where we switched to another timeline. We check for promotion or
3141 * timeline switch after calculating FlushPtr, to avoid a race
3142 * condition: if the timeline becomes historic just after we checked
3143 * that it was still current, it's still be OK to stream it up to the
3144 * FlushPtr that was calculated before it became historic.
3145 */
3146 bool becameHistoric = false;
3147
3148 SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3149
3150 if (!RecoveryInProgress())
3151 {
3152 /* We have been promoted. */
3153 SendRqstTLI = GetWALInsertionTimeLine();
3154 am_cascading_walsender = false;
3155 becameHistoric = true;
3156 }
3157 else
3158 {
3159 /*
3160 * Still a cascading standby. But is the timeline we're sending
3161 * still the one recovery is recovering from?
3162 */
3163 if (sendTimeLine != SendRqstTLI)
3164 becameHistoric = true;
3165 }
3166
3167 if (becameHistoric)
3168 {
3169 /*
3170 * The timeline we were sending has become historic. Read the
3171 * timeline history file of the new timeline to see where exactly
3172 * we forked off from the timeline we were sending.
3173 */
3174 List *history;
3175
3176 history = readTimeLineHistory(SendRqstTLI);
3178
3180 list_free_deep(history);
3181
3183
3184 SendRqstPtr = sendTimeLineValidUpto;
3185 }
3186 }
3187 else
3188 {
3189 /*
3190 * Streaming the current timeline on a primary.
3191 *
3192 * Attempt to send all data that's already been written out and
3193 * fsync'd to disk. We cannot go further than what's been written out
3194 * given the current implementation of WALRead(). And in any case
3195 * it's unsafe to send WAL that is not securely down to disk on the
3196 * primary: if the primary subsequently crashes and restarts, standbys
3197 * must not have applied any WAL that got lost on the primary.
3198 */
3199 SendRqstPtr = GetFlushRecPtr(NULL);
3200 }
3201
3202 /*
3203 * Record the current system time as an approximation of the time at which
3204 * this WAL location was written for the purposes of lag tracking.
3205 *
3206 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3207 * is flushed and we could get that time as well as the LSN when we call
3208 * GetFlushRecPtr() above (and likewise for the cascading standby
3209 * equivalent), but rather than putting any new code into the hot WAL path
3210 * it seems good enough to capture the time here. We should reach this
3211 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3212 * may take some time, we read the WAL flush pointer and take the time
3213 * very close to together here so that we'll get a later position if it is
3214 * still moving.
3215 *
3216 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3217 * this gives us a cheap approximation for the WAL flush time for this
3218 * LSN.
3219 *
3220 * Note that the LSN is not necessarily the LSN for the data contained in
3221 * the present message; it's the end of the WAL, which might be further
3222 * ahead. All the lag tracking machinery cares about is finding out when
3223 * that arbitrary LSN is eventually reported as written, flushed and
3224 * applied, so that it can measure the elapsed time.
3225 */
3226 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3227
3228 /*
3229 * If this is a historic timeline and we've reached the point where we
3230 * forked to the next timeline, stop streaming.
3231 *
3232 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3233 * startup process will normally replay all WAL that has been received
3234 * from the primary, before promoting, but if the WAL streaming is
3235 * terminated at a WAL page boundary, the valid portion of the timeline
3236 * might end in the middle of a WAL record. We might've already sent the
3237 * first half of that partial WAL record to the cascading standby, so that
3238 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3239 * replay the partial WAL record either, so it can still follow our
3240 * timeline switch.
3241 */
3243 {
3244 /* close the current file. */
3245 if (xlogreader->seg.ws_file >= 0)
3247
3248 /* Send CopyDone */
3249 pq_putmessage_noblock('c', NULL, 0);
3250 streamingDoneSending = true;
3251
3252 WalSndCaughtUp = true;
3253
3254 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3257 return;
3258 }
3259
3260 /* Do we have any work to do? */
3261 Assert(sentPtr <= SendRqstPtr);
3262 if (SendRqstPtr <= sentPtr)
3263 {
3264 WalSndCaughtUp = true;
3265 return;
3266 }
3267
3268 /*
3269 * Figure out how much to send in one message. If there's no more than
3270 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3271 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3272 *
3273 * The rounding is not only for performance reasons. Walreceiver relies on
3274 * the fact that we never split a WAL record across two messages. Since a
3275 * long WAL record is split at page boundary into continuation records,
3276 * page boundary is always a safe cut-off point. We also assume that
3277 * SendRqstPtr never points to the middle of a WAL record.
3278 */
3279 startptr = sentPtr;
3280 endptr = startptr;
3281 endptr += MAX_SEND_SIZE;
3282
3283 /* if we went beyond SendRqstPtr, back off */
3284 if (SendRqstPtr <= endptr)
3285 {
3286 endptr = SendRqstPtr;
3288 WalSndCaughtUp = false;
3289 else
3290 WalSndCaughtUp = true;
3291 }
3292 else
3293 {
3294 /* round down to page boundary. */
3295 endptr -= (endptr % XLOG_BLCKSZ);
3296 WalSndCaughtUp = false;
3297 }
3298
3299 nbytes = endptr - startptr;
3300 Assert(nbytes <= MAX_SEND_SIZE);
3301
3302 /*
3303 * OK to read and send the slice.
3304 */
3307
3308 pq_sendint64(&output_message, startptr); /* dataStart */
3309 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3310 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3311
3312 /*
3313 * Read the log directly into the output buffer to avoid extra memcpy
3314 * calls.
3315 */
3317
3318retry:
3319 /* attempt to read WAL from WAL buffers first */
3321 startptr, nbytes, xlogreader->seg.ws_tli);
3322 output_message.len += rbytes;
3323 startptr += rbytes;
3324 nbytes -= rbytes;
3325
3326 /* now read the remaining WAL from WAL file */
3327 if (nbytes > 0 &&
3330 startptr,
3331 nbytes,
3332 xlogreader->seg.ws_tli, /* Pass the current TLI because
3333 * only WalSndSegmentOpen controls
3334 * whether new TLI is needed. */
3335 &errinfo))
3336 WALReadRaiseError(&errinfo);
3337
3338 /* See logical_read_xlog_page(). */
3339 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3341
3342 /*
3343 * During recovery, the currently-open WAL file might be replaced with the
3344 * file of the same name retrieved from archive. So we always need to
3345 * check what we read was valid after reading into the buffer. If it's
3346 * invalid, we try to open and read the file again.
3347 */
3349 {
3350 WalSnd *walsnd = MyWalSnd;
3351 bool reload;
3352
3353 SpinLockAcquire(&walsnd->mutex);
3354 reload = walsnd->needreload;
3355 walsnd->needreload = false;
3356 SpinLockRelease(&walsnd->mutex);
3357
3358 if (reload && xlogreader->seg.ws_file >= 0)
3359 {
3361
3362 goto retry;
3363 }
3364 }
3365
3366 output_message.len += nbytes;
3368
3369 /*
3370 * Fill the send timestamp last, so that it is taken as late as possible.
3371 */
3374 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3375 tmpbuf.data, sizeof(int64));
3376
3378
3379 sentPtr = endptr;
3380
3381 /* Update shared memory status */
3382 {
3383 WalSnd *walsnd = MyWalSnd;
3384
3385 SpinLockAcquire(&walsnd->mutex);
3386 walsnd->sentPtr = sentPtr;
3387 SpinLockRelease(&walsnd->mutex);
3388 }
3389
3390 /* Report progress of XLOG streaming in PS display */
3392 {
3393 char activitymsg[50];
3394
3395 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3397 set_ps_display(activitymsg);
3398 }
3399}
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:111
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1759

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 123 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 235 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 213 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 173 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 196 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 152 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 153 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

bool waiting_for_ping_response = false
static

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 135 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader