PostgreSQL Source Code git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 226 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 114 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 103 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 258 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1416 of file walsender.c.

1417{
1418 bool failover_given = false;
1419 bool two_phase_given = false;
1420 bool failover;
1421 bool two_phase;
1422
1423 /* Parse options */
1424 foreach_ptr(DefElem, defel, cmd->options)
1425 {
1426 if (strcmp(defel->defname, "failover") == 0)
1427 {
1428 if (failover_given)
1429 ereport(ERROR,
1430 (errcode(ERRCODE_SYNTAX_ERROR),
1431 errmsg("conflicting or redundant options")));
1432 failover_given = true;
1433 failover = defGetBoolean(defel);
1434 }
1435 else if (strcmp(defel->defname, "two_phase") == 0)
1436 {
1437 if (two_phase_given)
1438 ereport(ERROR,
1439 (errcode(ERRCODE_SYNTAX_ERROR),
1440 errmsg("conflicting or redundant options")));
1441 two_phase_given = true;
1442 two_phase = defGetBoolean(defel);
1443 }
1444 else
1445 elog(ERROR, "unrecognized option: %s", defel->defname);
1446 }
1447
1449 failover_given ? &failover : NULL,
1450 two_phase_given ? &two_phase : NULL);
1451}
bool defGetBoolean(DefElem *def)
Definition: define.c:94
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:949

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1195 of file walsender.c.

1196{
1197 const char *snapshot_name = NULL;
1198 char xloc[MAXFNAMELEN];
1199 char *slot_name;
1200 bool reserve_wal = false;
1201 bool two_phase = false;
1202 bool failover = false;
1203 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1205 TupOutputState *tstate;
1206 TupleDesc tupdesc;
1207 Datum values[4];
1208 bool nulls[4] = {0};
1209
1211
1212 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1213 &failover);
1214
1215 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1216 {
1217 ReplicationSlotCreate(cmd->slotname, false,
1219 false, false, false);
1220
1221 if (reserve_wal)
1222 {
1224
1226
1227 /* Write this slot to disk if it's a permanent one. */
1228 if (!cmd->temporary)
1230 }
1231 }
1232 else
1233 {
1235 bool need_full_snapshot = false;
1236
1238
1240
1241 /*
1242 * Initially create persistent slot as ephemeral - that allows us to
1243 * nicely handle errors during initialization because it'll get
1244 * dropped if this transaction fails. We'll make it persistent at the
1245 * end. Temporary slots can be created as temporary from beginning as
1246 * they get dropped on error as well.
1247 */
1250 two_phase, failover, false);
1251
1252 /*
1253 * Do options check early so that we can bail before calling the
1254 * DecodingContextFindStartpoint which can take long time.
1255 */
1256 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1257 {
1258 if (IsTransactionBlock())
1259 ereport(ERROR,
1260 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1261 (errmsg("%s must not be called inside a transaction",
1262 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1263
1264 need_full_snapshot = true;
1265 }
1266 else if (snapshot_action == CRS_USE_SNAPSHOT)
1267 {
1268 if (!IsTransactionBlock())
1269 ereport(ERROR,
1270 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1271 (errmsg("%s must be called inside a transaction",
1272 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1273
1275 ereport(ERROR,
1276 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1277 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1278 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1279 if (!XactReadOnly)
1280 ereport(ERROR,
1281 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1282 (errmsg("%s must be called in a read-only transaction",
1283 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1284
1285 if (FirstSnapshotSet)
1286 ereport(ERROR,
1287 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1288 (errmsg("%s must be called before any query",
1289 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1290
1291 if (IsSubTransaction())
1292 ereport(ERROR,
1293 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1294 (errmsg("%s must not be called in a subtransaction",
1295 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1296
1297 need_full_snapshot = true;
1298 }
1299
1300 /*
1301 * Ensure the logical decoding is enabled before initializing the
1302 * logical decoding context.
1303 */
1306
1307 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1310 .segment_open = WalSndSegmentOpen,
1311 .segment_close = wal_segment_close),
1314
1315 /*
1316 * Signal that we don't need the timeout mechanism. We're just
1317 * creating the replication slot and don't yet accept feedback
1318 * messages or send keepalives. As we possibly need to wait for
1319 * further WAL the walsender would otherwise possibly be killed too
1320 * soon.
1321 */
1323
1324 /* build initial snapshot, might take a while */
1326
1327 /*
1328 * Export or use the snapshot if we've been asked to do so.
1329 *
1330 * NB. We will convert the snapbuild.c kind of snapshot to normal
1331 * snapshot when doing this.
1332 */
1333 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1334 {
1335 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1336 }
1337 else if (snapshot_action == CRS_USE_SNAPSHOT)
1338 {
1339 Snapshot snap;
1340
1343 }
1344
1345 /* don't need the decoding context anymore */
1347
1348 if (!cmd->temporary)
1350 }
1351
1352 snprintf(xloc, sizeof(xloc), "%X/%08X",
1354
1356
1357 /*----------
1358 * Need a tuple descriptor representing four columns:
1359 * - first field: the slot name
1360 * - second field: LSN at which we became consistent
1361 * - third field: exported snapshot's name
1362 * - fourth field: output plugin
1363 */
1364 tupdesc = CreateTemplateTupleDesc(4);
1365 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1366 TEXTOID, -1, 0);
1367 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1368 TEXTOID, -1, 0);
1369 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1370 TEXTOID, -1, 0);
1371 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1372 TEXTOID, -1, 0);
1373
1374 /* prepare for projection of tuples */
1375 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1376
1377 /* slot_name */
1378 slot_name = NameStr(MyReplicationSlot->data.name);
1379 values[0] = CStringGetTextDatum(slot_name);
1380
1381 /* consistent wal location */
1382 values[1] = CStringGetTextDatum(xloc);
1383
1384 /* snapshot name, or NULL if none */
1385 if (snapshot_name != NULL)
1386 values[2] = CStringGetTextDatum(snapshot_name);
1387 else
1388 nulls[2] = true;
1389
1390 /* plugin, or NULL if none */
1391 if (cmd->plugin != NULL)
1393 else
1394 nulls[3] = true;
1395
1396 /* send it to dest */
1397 do_tup_output(tstate, values, nulls);
1398 end_tup_output(tstate);
1399
1401}
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:155
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:771
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2464
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
Assert(PointerIsAligned(start, uint64))
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:668
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:624
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:321
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition: logicalctl.c:204
void EnsureLogicalDecodingEnabled(void)
Definition: logicalctl.c:305
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:260
uint64_t Datum
Definition: postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:378
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1173
void ReplicationSlotReserveWal(void)
Definition: slot.c:1693
void ReplicationSlotPersist(void)
Definition: slot.c:1190
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
void ReplicationSlotSave(void)
Definition: slot.c:1155
void ReplicationSlotRelease(void)
Definition: slot.c:758
@ RS_PERSISTENT
Definition: slot.h:45
@ RS_EPHEMERAL
Definition: slot.h:46
@ RS_TEMPORARY
Definition: slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:538
bool FirstSnapshotSet
Definition: snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition: snapmgr.c:1853
PGPROC * MyProc
Definition: proc.c:67
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:136
ReplicationSlotPersistentData data
Definition: slot.h:210
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:918
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1118
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:3114
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1578
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1674
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1045
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1551
static TimestampTz last_reply_timestamp
Definition: walsender.c:187
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:83
int XactIsoLevel
Definition: xact.c:80
bool IsSubTransaction(void)
Definition: xact.c:5066
bool IsTransactionBlock(void)
Definition: xact.c:4993
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg(), ERROR, failover, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1407 of file walsender.c.

1408{
1409 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1410}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:909

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1995 of file walsender.c.

1996{
1997 yyscan_t scanner;
1998 int parse_rc;
1999 Node *cmd_node;
2000 const char *cmdtag;
2001 MemoryContext old_context = CurrentMemoryContext;
2002
2003 /* We save and re-use the cmd_context across calls */
2004 static MemoryContext cmd_context = NULL;
2005
2006 /*
2007 * If WAL sender has been told that shutdown is getting close, switch its
2008 * status accordingly to handle the next replication commands correctly.
2009 */
2010 if (got_STOPPING)
2012
2013 /*
2014 * Throw error if in stopping mode. We need prevent commands that could
2015 * generate WAL while the shutdown checkpoint is being written. To be
2016 * safe, we just prohibit all new commands.
2017 */
2019 ereport(ERROR,
2020 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2021 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2022
2023 /*
2024 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2025 * command arrives. Clean up the old stuff if there's anything.
2026 */
2028
2030
2031 /*
2032 * Prepare to parse and execute the command.
2033 *
2034 * Because replication command execution can involve beginning or ending
2035 * transactions, we need a working context that will survive that, so we
2036 * make it a child of TopMemoryContext. That in turn creates a hazard of
2037 * long-lived memory leaks if we lose track of the working context. We
2038 * deal with that by creating it only once per walsender, and resetting it
2039 * for each new command. (Normally this reset is a no-op, but if the
2040 * prior exec_replication_command call failed with an error, it won't be.)
2041 *
2042 * This is subtler than it looks. The transactions we manage can extend
2043 * across replication commands, indeed SnapBuildClearExportedSnapshot
2044 * might have just ended one. Because transaction exit will revert to the
2045 * memory context that was current at transaction start, we need to be
2046 * sure that that context is still valid. That motivates re-using the
2047 * same cmd_context rather than making a new one each time.
2048 */
2049 if (cmd_context == NULL)
2051 "Replication command context",
2053 else
2054 MemoryContextReset(cmd_context);
2055
2056 MemoryContextSwitchTo(cmd_context);
2057
2058 replication_scanner_init(cmd_string, &scanner);
2059
2060 /*
2061 * Is it a WalSender command?
2062 */
2064 {
2065 /* Nope; clean up and get out. */
2067
2068 MemoryContextSwitchTo(old_context);
2069 MemoryContextReset(cmd_context);
2070
2071 /* XXX this is a pretty random place to make this check */
2072 if (MyDatabaseId == InvalidOid)
2073 ereport(ERROR,
2074 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2075 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2076
2077 /* Tell the caller that this wasn't a WalSender command. */
2078 return false;
2079 }
2080
2081 /*
2082 * Looks like a WalSender command, so parse it.
2083 */
2084 parse_rc = replication_yyparse(&cmd_node, scanner);
2085 if (parse_rc != 0)
2086 ereport(ERROR,
2087 (errcode(ERRCODE_SYNTAX_ERROR),
2088 errmsg_internal("replication command parser returned %d",
2089 parse_rc)));
2091
2092 /*
2093 * Report query to various monitoring facilities. For this purpose, we
2094 * report replication commands just like SQL commands.
2095 */
2096 debug_query_string = cmd_string;
2097
2099
2100 /*
2101 * Log replication command if log_replication_commands is enabled. Even
2102 * when it's disabled, log the command with DEBUG1 level for backward
2103 * compatibility.
2104 */
2106 (errmsg("received replication command: %s", cmd_string)));
2107
2108 /*
2109 * Disallow replication commands in aborted transaction blocks.
2110 */
2112 ereport(ERROR,
2113 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2114 errmsg("current transaction is aborted, "
2115 "commands ignored until end of transaction block")));
2116
2118
2119 /*
2120 * Allocate buffers that will be used for each outgoing and incoming
2121 * message. We do this just once per command to reduce palloc overhead.
2122 */
2126
2127 switch (cmd_node->type)
2128 {
2129 case T_IdentifySystemCmd:
2130 cmdtag = "IDENTIFY_SYSTEM";
2131 set_ps_display(cmdtag);
2133 EndReplicationCommand(cmdtag);
2134 break;
2135
2136 case T_ReadReplicationSlotCmd:
2137 cmdtag = "READ_REPLICATION_SLOT";
2138 set_ps_display(cmdtag);
2140 EndReplicationCommand(cmdtag);
2141 break;
2142
2143 case T_BaseBackupCmd:
2144 cmdtag = "BASE_BACKUP";
2145 set_ps_display(cmdtag);
2146 PreventInTransactionBlock(true, cmdtag);
2148 EndReplicationCommand(cmdtag);
2149 break;
2150
2151 case T_CreateReplicationSlotCmd:
2152 cmdtag = "CREATE_REPLICATION_SLOT";
2153 set_ps_display(cmdtag);
2155 EndReplicationCommand(cmdtag);
2156 break;
2157
2158 case T_DropReplicationSlotCmd:
2159 cmdtag = "DROP_REPLICATION_SLOT";
2160 set_ps_display(cmdtag);
2162 EndReplicationCommand(cmdtag);
2163 break;
2164
2165 case T_AlterReplicationSlotCmd:
2166 cmdtag = "ALTER_REPLICATION_SLOT";
2167 set_ps_display(cmdtag);
2169 EndReplicationCommand(cmdtag);
2170 break;
2171
2172 case T_StartReplicationCmd:
2173 {
2174 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2175
2176 cmdtag = "START_REPLICATION";
2177 set_ps_display(cmdtag);
2178 PreventInTransactionBlock(true, cmdtag);
2179
2180 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2181 StartReplication(cmd);
2182 else
2184
2185 /* dupe, but necessary per libpqrcv_endstreaming */
2186 EndReplicationCommand(cmdtag);
2187
2188 Assert(xlogreader != NULL);
2189 break;
2190 }
2191
2192 case T_TimeLineHistoryCmd:
2193 cmdtag = "TIMELINE_HISTORY";
2194 set_ps_display(cmdtag);
2195 PreventInTransactionBlock(true, cmdtag);
2197 EndReplicationCommand(cmdtag);
2198 break;
2199
2200 case T_VariableShowStmt:
2201 {
2203 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2204
2205 cmdtag = "SHOW";
2206 set_ps_display(cmdtag);
2207
2208 /* syscache access needs a transaction environment */
2210 GetPGVariable(n->name, dest);
2212 EndReplicationCommand(cmdtag);
2213 }
2214 break;
2215
2216 case T_UploadManifestCmd:
2217 cmdtag = "UPLOAD_MANIFEST";
2218 set_ps_display(cmdtag);
2219 PreventInTransactionBlock(true, cmdtag);
2221 EndReplicationCommand(cmdtag);
2222 break;
2223
2224 default:
2225 elog(ERROR, "unrecognized replication command node tag: %u",
2226 cmd_node->type);
2227 }
2228
2229 /*
2230 * Done. Revert to caller's memory context, and clean out the cmd_context
2231 * to recover memory right away.
2232 */
2233 MemoryContextSwitchTo(old_context);
2234 MemoryContextReset(cmd_context);
2235
2236 /*
2237 * We need not update ps display or pg_stat_activity, because PostgresMain
2238 * will reset those to "idle". But we must reset debug_query_string to
2239 * ensure it doesn't become a dangling pointer.
2240 */
2241 debug_query_string = NULL;
2242
2243 return true;
2244}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:408
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:403
MemoryContext TopMemoryContext
Definition: mcxt.c:166
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:599
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1416
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:581
WalSnd * MyWalSnd
Definition: walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:482
static StringInfoData tmpbuf
Definition: walsender.c:178
static void IdentifySystem(void)
Definition: walsender.c:401
static StringInfoData reply_message
Definition: walsender.c:177
void WalSndSetState(WalSndState state)
Definition: walsender.c:3943
static StringInfoData output_message
Definition: walsender.c:176
static void UploadManifest(void)
Definition: walsender.c:671
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:206
bool log_replication_commands
Definition: walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1195
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1458
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1407
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:813
static XLogReaderState * xlogreader
Definition: walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3669
void StartTransactionCommand(void)
Definition: xact.c:3080
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:408
void CommitTransactionCommand(void)
Definition: xact.c:3178

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3638 of file walsender.c.

3639{
3640 XLogRecPtr replayPtr;
3641 TimeLineID replayTLI;
3642 XLogRecPtr receivePtr;
3644 XLogRecPtr result;
3645
3647
3648 /*
3649 * We can safely send what's already been replayed. Also, if walreceiver
3650 * is streaming WAL from the same timeline, we can send anything that it
3651 * has streamed, but hasn't been replayed yet.
3652 */
3653
3654 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3655 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3656
3657 if (tli)
3658 *tli = replayTLI;
3659
3660 result = replayPtr;
3661 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3662 result = receivePtr;
3663
3664 return result;
3665}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1882
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63
static TimeLineID receiveTLI
Definition: xlogrecovery.c:266
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 737 of file walsender.c.

739{
740 int mtype;
741 int maxmsglen;
742
744
746 mtype = pq_getbyte();
747 if (mtype == EOF)
749 (errcode(ERRCODE_CONNECTION_FAILURE),
750 errmsg("unexpected EOF on client connection with an open transaction")));
751
752 switch (mtype)
753 {
754 case PqMsg_CopyData:
755 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
756 break;
757 case PqMsg_CopyDone:
758 case PqMsg_CopyFail:
759 case PqMsg_Flush:
760 case PqMsg_Sync:
761 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
762 break;
763 default:
765 (errcode(ERRCODE_PROTOCOL_VIOLATION),
766 errmsg("unexpected message type 0x%02X during COPY from stdin",
767 mtype)));
768 maxmsglen = 0; /* keep compiler quiet */
769 break;
770 }
771
772 /* Now collect the message body */
773 if (pq_getmessage(buf, maxmsglen))
775 (errcode(ERRCODE_CONNECTION_FAILURE),
776 errmsg("unexpected EOF on client connection with an open transaction")));
778
779 /* Process the message */
780 switch (mtype)
781 {
782 case PqMsg_CopyData:
783 AppendIncrementalManifestData(ib, buf->data, buf->len);
784 return true;
785
786 case PqMsg_CopyDone:
787 return false;
788
789 case PqMsg_Sync:
790 case PqMsg_Flush:
791 /* Ignore these while in CopyOut mode as we do elsewhere. */
792 return true;
793
794 case PqMsg_CopyFail:
796 (errcode(ERRCODE_QUERY_CANCELED),
797 errmsg("COPY from stdin failed: %s",
799 }
800
801 /* Not reached. */
802 Assert(false);
803 return false;
804}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:144
static char buf[DEFAULT_XLOG_SEG_SIZE]
Definition: pg_test_fsync.c:71
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1203
int pq_getbyte(void)
Definition: pqcomm.c:963
void pq_startmsgread(void)
Definition: pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:578
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Sync
Definition: protocol.h:27
#define PqMsg_CopyFail
Definition: protocol.h:29
#define PqMsg_Flush
Definition: protocol.h:24

References AppendIncrementalManifestData(), Assert(), buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3694 of file walsender.c.

3695{
3697
3698 /*
3699 * If replication has not yet started, die like with SIGTERM. If
3700 * replication is active, only set a flag and wake up the main loop. It
3701 * will send any outstanding WAL, wait for it to be replicated to the
3702 * standby, and then exit gracefully.
3703 */
3704 if (!replication_active)
3705 kill(MyProcPid, SIGTERM);
3706 else
3707 got_STOPPING = true;
3708}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:123
static volatile sig_atomic_t replication_active
Definition: walsender.c:214
#define kill(pid, sig)
Definition: win32_port.h:490

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 401 of file walsender.c.

402{
403 char sysid[32];
404 char xloc[MAXFNAMELEN];
405 XLogRecPtr logptr;
406 char *dbname = NULL;
408 TupOutputState *tstate;
409 TupleDesc tupdesc;
410 Datum values[4];
411 bool nulls[4] = {0};
412 TimeLineID currTLI;
413
414 /*
415 * Reply with a result set with one row, four columns. First col is system
416 * ID, second is timeline ID, third is current xlog location and the
417 * fourth contains the database name if we are connected to one.
418 */
419
420 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
422
425 logptr = GetStandbyFlushRecPtr(&currTLI);
426 else
427 logptr = GetFlushRecPtr(&currTLI);
428
429 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
430
432 {
434
435 /* syscache access needs a transaction env. */
438 /* copy dbname out of TX context */
441 }
442
444
445 /* need a tuple descriptor representing four columns */
446 tupdesc = CreateTemplateTupleDesc(4);
447 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
448 TEXTOID, -1, 0);
449 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
450 INT8OID, -1, 0);
451 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
452 TEXTOID, -1, 0);
453 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
454 TEXTOID, -1, 0);
455
456 /* prepare for projection of tuples */
457 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
458
459 /* column 1: system identifier */
460 values[0] = CStringGetTextDatum(sysid);
461
462 /* column 2: timeline */
463 values[1] = Int64GetDatum(currTLI);
464
465 /* column 3: wal location */
466 values[2] = CStringGetTextDatum(xloc);
467
468 /* column 4: database name, or NULL if none */
469 if (dbname)
471 else
472 nulls[3] = true;
473
474 /* send it to dest */
475 do_tup_output(tstate, values, nulls);
476
477 end_tup_output(tstate);
478}
#define UINT64_FORMAT
Definition: c.h:571
struct cursor * cur
Definition: ecpg.c:29
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1242
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1768
static Datum Int64GetDatum(int64 X)
Definition: postgres.h:423
char * dbname
Definition: streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3638
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4628
bool RecoveryInProgress(void)
Definition: xlog.c:6461
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6626

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
330 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
333 LWLockRelease(ProcArrayLock);
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:996
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:194
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:3022
static LagTracker * lag_tracker
Definition: walsender.c:252

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3022 of file walsender.c.

3023{
3024 int i;
3025
3026 /*
3027 * WalSndCtl should be set up already (we inherit this by fork() or
3028 * EXEC_BACKEND mechanism from the postmaster).
3029 */
3030 Assert(WalSndCtl != NULL);
3031 Assert(MyWalSnd == NULL);
3032
3033 /*
3034 * Find a free walsender slot and reserve it. This must not fail due to
3035 * the prior check for free WAL senders in InitProcess().
3036 */
3037 for (i = 0; i < max_wal_senders; i++)
3038 {
3039 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3040
3041 SpinLockAcquire(&walsnd->mutex);
3042
3043 if (walsnd->pid != 0)
3044 {
3045 SpinLockRelease(&walsnd->mutex);
3046 continue;
3047 }
3048 else
3049 {
3050 /*
3051 * Found a free slot. Reserve it for us.
3052 */
3053 walsnd->pid = MyProcPid;
3054 walsnd->state = WALSNDSTATE_STARTUP;
3055 walsnd->sentPtr = InvalidXLogRecPtr;
3056 walsnd->needreload = false;
3057 walsnd->write = InvalidXLogRecPtr;
3058 walsnd->flush = InvalidXLogRecPtr;
3059 walsnd->apply = InvalidXLogRecPtr;
3060 walsnd->writeLag = -1;
3061 walsnd->flushLag = -1;
3062 walsnd->applyLag = -1;
3063 walsnd->sync_standby_priority = 0;
3064 walsnd->replyTime = 0;
3065
3066 /*
3067 * The kind assignment is done here and not in StartReplication()
3068 * and StartLogicalReplication(). Indeed, the logical walsender
3069 * needs to read WAL records (like snapshot of running
3070 * transactions) during the slot creation. So it needs to be woken
3071 * up based on its kind.
3072 *
3073 * The kind assignment could also be done in StartReplication(),
3074 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3075 * seems better to set it on one place.
3076 */
3077 if (MyDatabaseId == InvalidOid)
3079 else
3081
3082 SpinLockRelease(&walsnd->mutex);
3083 /* don't need the lock anymore */
3084 MyWalSnd = walsnd;
3085
3086 break;
3087 }
3088 }
3089
3090 Assert(MyWalSnd != NULL);
3091
3092 /* Arrange to clean up at walsender exit */
3094}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:129
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:3098
WalSndCtlData * WalSndCtl
Definition: walsender.c:117
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4287 of file walsender.c.

4288{
4289 TimestampTz time = 0;
4290
4291 /*
4292 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4293 * return the elapsed time (in microseconds) since the saved local flush
4294 * time. If the flush time is in the future (due to clock drift), return
4295 * -1 to treat as no valid sample.
4296 *
4297 * Otherwise, switch back to using the buffer to control the read head and
4298 * compute the elapsed time. The read head is then reset to point to the
4299 * oldest entry in the buffer.
4300 */
4301 if (lag_tracker->read_heads[head] == -1)
4302 {
4303 if (lag_tracker->overflowed[head].lsn > lsn)
4304 return (now >= lag_tracker->overflowed[head].time) ?
4305 now - lag_tracker->overflowed[head].time : -1;
4306
4307 time = lag_tracker->overflowed[head].time;
4309 lag_tracker->read_heads[head] =
4311 }
4312
4313 /* Read all unread samples up to this LSN or end of buffer. */
4314 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4316 {
4318 lag_tracker->last_read[head] =
4320 lag_tracker->read_heads[head] =
4322 }
4323
4324 /*
4325 * If the lag tracker is empty, that means the standby has processed
4326 * everything we've ever sent so we should now clear 'last_read'. If we
4327 * didn't do that, we'd risk using a stale and irrelevant sample for
4328 * interpolation at the beginning of the next burst of WAL after a period
4329 * of idleness.
4330 */
4332 lag_tracker->last_read[head].time = 0;
4333
4334 if (time > now)
4335 {
4336 /* If the clock somehow went backwards, treat as not found. */
4337 return -1;
4338 }
4339 else if (time == 0)
4340 {
4341 /*
4342 * We didn't cross a time. If there is a future sample that we
4343 * haven't reached yet, and we've already reached at least one sample,
4344 * let's interpolate the local flushed time. This is mainly useful
4345 * for reporting a completely stuck apply position as having
4346 * increasing lag, since otherwise we'd have to wait for it to
4347 * eventually start moving again and cross one of our samples before
4348 * we can show the lag increasing.
4349 */
4351 {
4352 /* There are no future samples, so we can't interpolate. */
4353 return -1;
4354 }
4355 else if (lag_tracker->last_read[head].time != 0)
4356 {
4357 /* We can interpolate between last_read and the next sample. */
4358 double fraction;
4359 WalTimeSample prev = lag_tracker->last_read[head];
4361
4362 if (lsn < prev.lsn)
4363 {
4364 /*
4365 * Reported LSNs shouldn't normally go backwards, but it's
4366 * possible when there is a timeline change. Treat as not
4367 * found.
4368 */
4369 return -1;
4370 }
4371
4372 Assert(prev.lsn < next.lsn);
4373
4374 if (prev.time > next.time)
4375 {
4376 /* If the clock somehow went backwards, treat as not found. */
4377 return -1;
4378 }
4379
4380 /* See how far we are between the previous and next samples. */
4381 fraction =
4382 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4383
4384 /* Scale the local flush time proportionally. */
4385 time = (TimestampTz)
4386 ((double) prev.time + (next.time - prev.time) * fraction);
4387 }
4388 else
4389 {
4390 /*
4391 * We have only a future sample, implying that we were entirely
4392 * caught up but and now there is a new burst of WAL and the
4393 * standby hasn't processed the first sample yet. Until the
4394 * standby reaches the future sample the best we can do is report
4395 * the hypothetical lag if that sample were to be replayed now.
4396 */
4398 }
4399 }
4400
4401 /* Return the elapsed time since local flush time in microseconds. */
4402 Assert(time != 0);
4403 return now - time;
4404}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
static int32 next
Definition: blutils.c:225
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:232
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:234
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:235
int write_head
Definition: walsender.c:233
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:249
TimestampTz time
Definition: walsender.c:222
XLogRecPtr lsn
Definition: walsender.c:221
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:226

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4229 of file walsender.c.

4230{
4231 int new_write_head;
4232 int i;
4233
4234 if (!am_walsender)
4235 return;
4236
4237 /*
4238 * If the lsn hasn't advanced since last time, then do nothing. This way
4239 * we only record a new sample when new WAL has been written.
4240 */
4241 if (lag_tracker->last_lsn == lsn)
4242 return;
4243 lag_tracker->last_lsn = lsn;
4244
4245 /*
4246 * If advancing the write head of the circular buffer would crash into any
4247 * of the read heads, then the buffer is full. In other words, the
4248 * slowest reader (presumably apply) is the one that controls the release
4249 * of space.
4250 */
4251 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4252 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4253 {
4254 /*
4255 * If the buffer is full, move the slowest reader to a separate
4256 * overflow entry and free its space in the buffer so the write head
4257 * can advance.
4258 */
4259 if (new_write_head == lag_tracker->read_heads[i])
4260 {
4263 lag_tracker->read_heads[i] = -1;
4264 }
4265 }
4266
4267 /* Store a sample at the current write head position. */
4269 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4270 lag_tracker->write_head = new_write_head;
4271}
XLogRecPtr last_lsn
Definition: walsender.c:231
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1045 of file walsender.c.

1047{
1048 XLogRecPtr flushptr;
1049 int count;
1050 WALReadError errinfo;
1051 XLogSegNo segno;
1052 TimeLineID currTLI;
1053
1054 /*
1055 * Make sure we have enough WAL available before retrieving the current
1056 * timeline.
1057 */
1058 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1059
1060 /* Fail if not enough (implies we are going to shut down) */
1061 if (flushptr < targetPagePtr + reqLen)
1062 return -1;
1063
1064 /*
1065 * Since logical decoding is also permitted on a standby server, we need
1066 * to check if the server is in recovery to decide how to get the current
1067 * timeline ID (so that it also covers the promotion or timeline change
1068 * cases). We must determine am_cascading_walsender after waiting for the
1069 * required WAL so that it is correct when the walsender wakes up after a
1070 * promotion.
1071 */
1073
1075 GetXLogReplayRecPtr(&currTLI);
1076 else
1077 currTLI = GetWALInsertionTimeLine();
1078
1079 XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1080 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1081 sendTimeLine = state->currTLI;
1082 sendTimeLineValidUpto = state->currTLIValidUntil;
1083 sendTimeLineNextTLI = state->nextTLI;
1084
1085 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1086 count = XLOG_BLCKSZ; /* more than one block available */
1087 else
1088 count = flushptr - targetPagePtr; /* part of the page available */
1089
1090 /* now actually read the data, we know it's there */
1091 if (!WALRead(state,
1092 cur_page,
1093 targetPagePtr,
1094 count,
1095 currTLI, /* Pass the current TLI because only
1096 * WalSndSegmentOpen controls whether new TLI
1097 * is needed. */
1098 &errinfo))
1099 WALReadRaiseError(&errinfo);
1100
1101 /*
1102 * After reading into the buffer, check that what we read was valid. We do
1103 * this after reading, because even though the segment was present when we
1104 * opened it, it might get recycled or removed while we read it. The
1105 * read() succeeds in that case, but the data we tried to read might
1106 * already have been overwritten with new WAL records.
1107 */
1108 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1109 CheckXLogRemoved(segno, state->seg.ws_tli);
1110
1111 return count;
1112}
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:164
static bool sendTimeLineIsHistoric
Definition: walsender.c:166
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1824
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:165
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:167
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6647
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3765
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1514
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1764 of file walsender.c.

1765{
1766 int elevel = got_STOPPING ? ERROR : WARNING;
1767 bool failover_slot;
1768
1769 failover_slot = (replication_active && MyReplicationSlot->data.failover);
1770
1771 /*
1772 * Note that after receiving the shutdown signal, an ERROR is reported if
1773 * any slots are dropped, invalidated, or inactive. This measure is taken
1774 * to prevent the walsender from waiting indefinitely.
1775 */
1776 if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1777 {
1778 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1779 return true;
1780 }
1781
1782 *wait_event = 0;
1783 return false;
1784}
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:3083

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1796 of file walsender.c.

1798{
1799 /* Check if we need to wait for WALs to be flushed to disk */
1800 if (target_lsn > flushed_lsn)
1801 {
1802 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1803 return true;
1804 }
1805
1806 /* Check if the standby slots have caught up to the flushed position */
1807 return NeedToWaitForStandbys(flushed_lsn, wait_event);
1808}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1764

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3981 of file walsender.c.

3982{
3983 Interval *result = palloc_object(Interval);
3984
3985 result->month = 0;
3986 result->day = 0;
3987 result->time = offset;
3988
3989 return result;
3990}
#define palloc_object(type)
Definition: fe_memutils.h:74
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc_object, and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool *  reserve_wal,
CRSSnapshotAction snapshot_action,
bool *  two_phase,
bool *  failover 
)
static

Definition at line 1118 of file walsender.c.

1122{
1123 ListCell *lc;
1124 bool snapshot_action_given = false;
1125 bool reserve_wal_given = false;
1126 bool two_phase_given = false;
1127 bool failover_given = false;
1128
1129 /* Parse options */
1130 foreach(lc, cmd->options)
1131 {
1132 DefElem *defel = (DefElem *) lfirst(lc);
1133
1134 if (strcmp(defel->defname, "snapshot") == 0)
1135 {
1136 char *action;
1137
1138 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1139 ereport(ERROR,
1140 (errcode(ERRCODE_SYNTAX_ERROR),
1141 errmsg("conflicting or redundant options")));
1142
1143 action = defGetString(defel);
1144 snapshot_action_given = true;
1145
1146 if (strcmp(action, "export") == 0)
1147 *snapshot_action = CRS_EXPORT_SNAPSHOT;
1148 else if (strcmp(action, "nothing") == 0)
1149 *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1150 else if (strcmp(action, "use") == 0)
1151 *snapshot_action = CRS_USE_SNAPSHOT;
1152 else
1153 ereport(ERROR,
1154 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1155 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1156 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1157 }
1158 else if (strcmp(defel->defname, "reserve_wal") == 0)
1159 {
1160 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1161 ereport(ERROR,
1162 (errcode(ERRCODE_SYNTAX_ERROR),
1163 errmsg("conflicting or redundant options")));
1164
1165 reserve_wal_given = true;
1166 *reserve_wal = defGetBoolean(defel);
1167 }
1168 else if (strcmp(defel->defname, "two_phase") == 0)
1169 {
1170 if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1171 ereport(ERROR,
1172 (errcode(ERRCODE_SYNTAX_ERROR),
1173 errmsg("conflicting or redundant options")));
1174 two_phase_given = true;
1175 *two_phase = defGetBoolean(defel);
1176 }
1177 else if (strcmp(defel->defname, "failover") == 0)
1178 {
1179 if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1180 ereport(ERROR,
1181 (errcode(ERRCODE_SYNTAX_ERROR),
1182 errmsg("conflicting or redundant options")));
1183 failover_given = true;
1184 *failover = defGetBoolean(defel);
1185 }
1186 else
1187 elog(ERROR, "unrecognized option: %s", defel->defname);
1188 }
1189}
char * defGetString(DefElem *def)
Definition: define.c:35
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:844
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, failover, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3997 of file walsender.c.

3998{
3999#define PG_STAT_GET_WAL_SENDERS_COLS 12
4000 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4001 SyncRepStandbyData *sync_standbys;
4002 int num_standbys;
4003 int i;
4004
4005 InitMaterializedSRF(fcinfo, 0);
4006
4007 /*
4008 * Get the currently active synchronous standbys. This could be out of
4009 * date before we're done, but we'll use the data anyway.
4010 */
4011 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
4012
4013 for (i = 0; i < max_wal_senders; i++)
4014 {
4015 WalSnd *walsnd = &WalSndCtl->walsnds[i];
4016 XLogRecPtr sent_ptr;
4018 XLogRecPtr flush;
4019 XLogRecPtr apply;
4020 TimeOffset writeLag;
4021 TimeOffset flushLag;
4022 TimeOffset applyLag;
4023 int priority;
4024 int pid;
4026 TimestampTz replyTime;
4027 bool is_sync_standby;
4029 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4030 int j;
4031
4032 /* Collect data from shared memory */
4033 SpinLockAcquire(&walsnd->mutex);
4034 if (walsnd->pid == 0)
4035 {
4036 SpinLockRelease(&walsnd->mutex);
4037 continue;
4038 }
4039 pid = walsnd->pid;
4040 sent_ptr = walsnd->sentPtr;
4041 state = walsnd->state;
4042 write = walsnd->write;
4043 flush = walsnd->flush;
4044 apply = walsnd->apply;
4045 writeLag = walsnd->writeLag;
4046 flushLag = walsnd->flushLag;
4047 applyLag = walsnd->applyLag;
4048 priority = walsnd->sync_standby_priority;
4049 replyTime = walsnd->replyTime;
4050 SpinLockRelease(&walsnd->mutex);
4051
4052 /*
4053 * Detect whether walsender is/was considered synchronous. We can
4054 * provide some protection against stale data by checking the PID
4055 * along with walsnd_index.
4056 */
4057 is_sync_standby = false;
4058 for (j = 0; j < num_standbys; j++)
4059 {
4060 if (sync_standbys[j].walsnd_index == i &&
4061 sync_standbys[j].pid == pid)
4062 {
4063 is_sync_standby = true;
4064 break;
4065 }
4066 }
4067
4068 values[0] = Int32GetDatum(pid);
4069
4070 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
4071 {
4072 /*
4073 * Only superusers and roles with privileges of pg_read_all_stats
4074 * can see details. Other users only get the pid value to know
4075 * it's a walsender, but no details.
4076 */
4077 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4078 }
4079 else
4080 {
4082
4083 if (!XLogRecPtrIsValid(sent_ptr))
4084 nulls[2] = true;
4085 values[2] = LSNGetDatum(sent_ptr);
4086
4088 nulls[3] = true;
4089 values[3] = LSNGetDatum(write);
4090
4091 if (!XLogRecPtrIsValid(flush))
4092 nulls[4] = true;
4093 values[4] = LSNGetDatum(flush);
4094
4095 if (!XLogRecPtrIsValid(apply))
4096 nulls[5] = true;
4097 values[5] = LSNGetDatum(apply);
4098
4099 /*
4100 * Treat a standby such as a pg_basebackup background process
4101 * which always returns an invalid flush location, as an
4102 * asynchronous standby.
4103 */
4104 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4105
4106 if (writeLag < 0)
4107 nulls[6] = true;
4108 else
4110
4111 if (flushLag < 0)
4112 nulls[7] = true;
4113 else
4115
4116 if (applyLag < 0)
4117 nulls[8] = true;
4118 else
4120
4121 values[9] = Int32GetDatum(priority);
4122
4123 /*
4124 * More easily understood version of standby state. This is purely
4125 * informational.
4126 *
4127 * In quorum-based sync replication, the role of each standby
4128 * listed in synchronous_standby_names can be changing very
4129 * frequently. Any standbys considered as "sync" at one moment can
4130 * be switched to "potential" ones at the next moment. So, it's
4131 * basically useless to report "sync" or "potential" as their sync
4132 * states. We report just "quorum" for them.
4133 */
4134 if (priority == 0)
4135 values[10] = CStringGetTextDatum("async");
4136 else if (is_sync_standby)
4138 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4139 else
4140 values[10] = CStringGetTextDatum("potential");
4141
4142 if (replyTime == 0)
4143 nulls[11] = true;
4144 else
4145 values[11] = TimestampTzGetDatum(replyTime);
4146 }
4147
4148 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4149 values, nulls);
4150 }
4151
4152 return (Datum) 0;
4153}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5284
#define MemSet(start, val, len)
Definition: c.h:1019
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:78
Oid GetUserId(void)
Definition: miscinit.c:469
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:754
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3981
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3962
WalSndState
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2402 of file walsender.c.

2403{
2404 bool changed = false;
2406
2408 SpinLockAcquire(&slot->mutex);
2409 if (slot->data.restart_lsn != lsn)
2410 {
2411 changed = true;
2412 slot->data.restart_lsn = lsn;
2413 }
2414 SpinLockRelease(&slot->mutex);
2415
2416 if (changed)
2417 {
2421 }
2422
2423 /*
2424 * One could argue that the slot should be saved to disk now, but that'd
2425 * be energy wasted - the worst thing lost information could cause here is
2426 * to give wrong information in a statistics view - we'll just potentially
2427 * be more conservative in removing files.
2428 */
2429}
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1297
slock_t mutex
Definition: slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1739

References Assert(), ReplicationSlot::data, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2540 of file walsender.c.

2541{
2542 bool changed = false;
2544
2545 SpinLockAcquire(&slot->mutex);
2547
2548 /*
2549 * For physical replication we don't need the interlock provided by xmin
2550 * and effective_xmin since the consequences of a missed increase are
2551 * limited to query cancellations, so set both at once.
2552 */
2553 if (!TransactionIdIsNormal(slot->data.xmin) ||
2554 !TransactionIdIsNormal(feedbackXmin) ||
2555 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2556 {
2557 changed = true;
2558 slot->data.xmin = feedbackXmin;
2559 slot->effective_xmin = feedbackXmin;
2560 }
2562 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2563 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2564 {
2565 changed = true;
2566 slot->data.catalog_xmin = feedbackCatalogXmin;
2567 slot->effective_catalog_xmin = feedbackCatalogXmin;
2568 }
2569 SpinLockRelease(&slot->mutex);
2570
2571 if (changed)
2572 {
2575 }
2576}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1215
TransactionId xmin
Definition: slot.h:114
TransactionId catalog_xmin
Definition: slot.h:122
TransactionId effective_catalog_xmin
Definition: slot.h:207
TransactionId effective_xmin
Definition: slot.h:206
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1739 of file walsender.c.

1740{
1742
1743 /*
1744 * If we are running in a standby, there is no need to wake up walsenders.
1745 * This is because we do not support syncing slots to cascading standbys,
1746 * so, there are no walsenders waiting for standbys to catch up.
1747 */
1748 if (RecoveryInProgress())
1749 return;
1750
1753}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:3050
#define SlotIsPhysical(slot)
Definition: slot.h:284
ConditionVariable wal_confirm_rcv_cv

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1620 of file walsender.c.

1621{
1622 for (;;)
1623 {
1624 long sleeptime;
1625
1626 /* Check for input from the client */
1628
1629 /* die if timeout was reached */
1631
1632 /* Send keepalive if the time has come */
1634
1635 if (!pq_is_send_pending())
1636 break;
1637
1639
1640 /* Sleep until something happens or we time out */
1642 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1643
1644 /* Clear any already-pending wakeups */
1646
1648
1649 /* Process any requests or signals received recently */
1651 {
1652 ConfigReloadPending = false;
1655 }
1656
1657 /* Try to flush pending output to the client */
1658 if (pq_flush_if_writable() != 0)
1660 }
1661
1662 /* reactivate latch so WalSndLoop knows to continue */
1664}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:445
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_SOCKET_WRITEABLE
Definition: waiteventset.h:36
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3821
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2853
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2251
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4191
static pg_noreturn void WalSndShutdown(void)
Definition: walsender.c:384
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2809

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2251 of file walsender.c.

2252{
2253 unsigned char firstchar;
2254 int maxmsglen;
2255 int r;
2256 bool received = false;
2257
2259
2260 /*
2261 * If we already received a CopyDone from the frontend, any subsequent
2262 * message is the beginning of a new command, and should be processed in
2263 * the main processing loop.
2264 */
2265 while (!streamingDoneReceiving)
2266 {
2268 r = pq_getbyte_if_available(&firstchar);
2269 if (r < 0)
2270 {
2271 /* unexpected error or EOF */
2273 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2274 errmsg("unexpected EOF on standby connection")));
2275 proc_exit(0);
2276 }
2277 if (r == 0)
2278 {
2279 /* no data available without blocking */
2280 pq_endmsgread();
2281 break;
2282 }
2283
2284 /* Validate message type and set packet size limit */
2285 switch (firstchar)
2286 {
2287 case PqMsg_CopyData:
2288 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2289 break;
2290 case PqMsg_CopyDone:
2291 case PqMsg_Terminate:
2292 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2293 break;
2294 default:
2295 ereport(FATAL,
2296 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2297 errmsg("invalid standby message type \"%c\"",
2298 firstchar)));
2299 maxmsglen = 0; /* keep compiler quiet */
2300 break;
2301 }
2302
2303 /* Read the message contents */
2305 if (pq_getmessage(&reply_message, maxmsglen))
2306 {
2308 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2309 errmsg("unexpected EOF on standby connection")));
2310 proc_exit(0);
2311 }
2312
2313 /* ... and process it */
2314 switch (firstchar)
2315 {
2316 /*
2317 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2318 * packet.
2319 */
2320 case PqMsg_CopyData:
2322 received = true;
2323 break;
2324
2325 /*
2326 * PqMsg_CopyDone means the standby requested to finish
2327 * streaming. Reply with CopyDone, if we had not sent that
2328 * already.
2329 */
2330 case PqMsg_CopyDone:
2332 {
2334 streamingDoneSending = true;
2335 }
2336
2338 received = true;
2339 break;
2340
2341 /*
2342 * PqMsg_Terminate means that the standby is closing down the
2343 * socket.
2344 */
2345 case PqMsg_Terminate:
2346 proc_exit(0);
2347
2348 default:
2349 Assert(false); /* NOT REACHED */
2350 }
2351 }
2352
2353 /*
2354 * Save the last reply timestamp if we've received at least one reply.
2355 */
2356 if (received)
2357 {
2360 }
2361}
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1003
void pq_endmsgread(void)
Definition: pqcomm.c:1165
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static bool waiting_for_ping_response
Definition: walsender.c:190
static TimestampTz last_processing
Definition: walsender.c:181
static bool streamingDoneSending
Definition: walsender.c:198
static void ProcessStandbyMessage(void)
Definition: walsender.c:2367
static bool streamingDoneReceiving
Definition: walsender.c:199

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2620 of file walsender.c.

2621{
2622 TransactionId feedbackXmin;
2623 uint32 feedbackEpoch;
2624 TransactionId feedbackCatalogXmin;
2625 uint32 feedbackCatalogEpoch;
2626 TimestampTz replyTime;
2627
2628 /*
2629 * Decipher the reply message. The caller already consumed the msgtype
2630 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2631 * of this message.
2632 */
2633 replyTime = pq_getmsgint64(&reply_message);
2634 feedbackXmin = pq_getmsgint(&reply_message, 4);
2635 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2636 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2637 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2638
2640 {
2641 char *replyTimeStr;
2642
2643 /* Copy because timestamptz_to_str returns a static buffer */
2644 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2645
2646 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2647 feedbackXmin,
2648 feedbackEpoch,
2649 feedbackCatalogXmin,
2650 feedbackCatalogEpoch,
2651 replyTimeStr);
2652
2653 pfree(replyTimeStr);
2654 }
2655
2656 /*
2657 * Update shared state for this WalSender process based on reply data from
2658 * standby.
2659 */
2660 {
2661 WalSnd *walsnd = MyWalSnd;
2662
2663 SpinLockAcquire(&walsnd->mutex);
2664 walsnd->replyTime = replyTime;
2665 SpinLockRelease(&walsnd->mutex);
2666 }
2667
2668 /*
2669 * Unset WalSender's xmins if the feedback message values are invalid.
2670 * This happens when the downstream turned hot_standby_feedback off.
2671 */
2672 if (!TransactionIdIsNormal(feedbackXmin)
2673 && !TransactionIdIsNormal(feedbackCatalogXmin))
2674 {
2676 if (MyReplicationSlot != NULL)
2677 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2678 return;
2679 }
2680
2681 /*
2682 * Check that the provided xmin/epoch are sane, that is, not in the future
2683 * and not so far back as to be already wrapped around. Ignore if not.
2684 */
2685 if (TransactionIdIsNormal(feedbackXmin) &&
2686 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2687 return;
2688
2689 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2690 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2691 return;
2692
2693 /*
2694 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2695 * the xmin will be taken into account by GetSnapshotData() /
2696 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2697 * thereby prevent the generation of cleanup conflicts on the standby
2698 * server.
2699 *
2700 * There is a small window for a race condition here: although we just
2701 * checked that feedbackXmin precedes nextXid, the nextXid could have
2702 * gotten advanced between our fetching it and applying the xmin below,
2703 * perhaps far enough to make feedbackXmin wrap around. In that case the
2704 * xmin we set here would be "in the future" and have no effect. No point
2705 * in worrying about this since it's too late to save the desired data
2706 * anyway. Assuming that the standby sends us an increasing sequence of
2707 * xmins, this could only happen during the first reply cycle, else our
2708 * own xmin would prevent nextXid from advancing so far.
2709 *
2710 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2711 * is assumed atomic, and there's no real need to prevent concurrent
2712 * horizon determinations. (If we're moving our xmin forward, this is
2713 * obviously safe, and if we're moving it backwards, well, the data is at
2714 * risk already since a VACUUM could already have determined the horizon.)
2715 *
2716 * If we're using a replication slot we reserve the xmin via that,
2717 * otherwise via the walsender's PGPROC entry. We can only track the
2718 * catalog xmin separately when using a slot, so we store the least of the
2719 * two provided when not using a slot.
2720 *
2721 * XXX: It might make sense to generalize the ephemeral slot concept and
2722 * always use the slot mechanism to handle the feedback xmin.
2723 */
2724 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2725 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2726 else
2727 {
2728 if (TransactionIdIsNormal(feedbackCatalogXmin)
2729 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2730 MyProc->xmin = feedbackCatalogXmin;
2731 else
2732 MyProc->xmin = feedbackXmin;
2733 }
2734}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
uint32_t uint32
Definition: c.h:552
uint32 TransactionId
Definition: c.h:672
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1781
void pfree(void *pointer)
Definition: mcxt.c:1616
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2540
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2589

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2367 of file walsender.c.

2368{
2369 char msgtype;
2370
2371 /*
2372 * Check message type from the first byte.
2373 */
2374 msgtype = pq_getmsgbyte(&reply_message);
2375
2376 switch (msgtype)
2377 {
2380 break;
2381
2384 break;
2385
2388 break;
2389
2390 default:
2392 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2393 errmsg("unexpected message type \"%c\"", msgtype)));
2394 proc_exit(0);
2395 }
2396}
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition: protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition: protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition: protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2620
static void ProcessStandbyPSRequestMessage(void)
Definition: walsender.c:2740
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2435

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2740 of file walsender.c.

2741{
2743 TransactionId oldestXidInCommit;
2744 TransactionId oldestGXidInCommit;
2745 FullTransactionId nextFullXid;
2746 FullTransactionId fullOldestXidInCommit;
2747 WalSnd *walsnd = MyWalSnd;
2748 TimestampTz replyTime;
2749
2750 /*
2751 * This shouldn't happen because we don't support getting primary status
2752 * message from standby.
2753 */
2754 if (RecoveryInProgress())
2755 elog(ERROR, "the primary status is unavailable during recovery");
2756
2757 replyTime = pq_getmsgint64(&reply_message);
2758
2759 /*
2760 * Update shared state for this WalSender process based on reply data from
2761 * standby.
2762 */
2763 SpinLockAcquire(&walsnd->mutex);
2764 walsnd->replyTime = replyTime;
2765 SpinLockRelease(&walsnd->mutex);
2766
2767 /*
2768 * Consider transactions in the current database, as only these are the
2769 * ones replicated.
2770 */
2771 oldestXidInCommit = GetOldestActiveTransactionId(true, false);
2772 oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
2773
2774 /*
2775 * Update the oldest xid for standby transmission if an older prepared
2776 * transaction exists and is currently in commit phase.
2777 */
2778 if (TransactionIdIsValid(oldestGXidInCommit) &&
2779 TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
2780 oldestXidInCommit = oldestGXidInCommit;
2781
2782 nextFullXid = ReadNextFullTransactionId();
2783 fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
2784 oldestXidInCommit);
2785 lsn = GetXLogWriteRecPtr();
2786
2787 elog(DEBUG2, "sending primary status");
2788
2789 /* construct the message... */
2793 pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
2796
2797 /* ... and send it wrapped in CopyData */
2799}
int64_t int64
Definition: c.h:549
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition: procarray.c:2835
#define PqReplMsg_PrimaryStatusUpdate
Definition: protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition: transam.h:443
#define U64FromFullTransactionId(x)
Definition: transam.h:49
#define TransactionIdIsValid(xid)
Definition: transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition: twophase.c:2829
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9612

References StringInfoData::data, DEBUG2, elog, ERROR, FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, WalSnd::mutex, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, WalSnd::replyTime, resetStringInfo(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2435 of file walsender.c.

2436{
2437 XLogRecPtr writePtr,
2438 flushPtr,
2439 applyPtr;
2440 bool replyRequested;
2441 TimeOffset writeLag,
2442 flushLag,
2443 applyLag;
2444 bool clearLagTimes;
2446 TimestampTz replyTime;
2447
2448 static bool fullyAppliedLastTime = false;
2449
2450 /* the caller already consumed the msgtype byte */
2451 writePtr = pq_getmsgint64(&reply_message);
2452 flushPtr = pq_getmsgint64(&reply_message);
2453 applyPtr = pq_getmsgint64(&reply_message);
2454 replyTime = pq_getmsgint64(&reply_message);
2455 replyRequested = pq_getmsgbyte(&reply_message);
2456
2458 {
2459 char *replyTimeStr;
2460
2461 /* Copy because timestamptz_to_str returns a static buffer */
2462 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2463
2464 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2465 LSN_FORMAT_ARGS(writePtr),
2466 LSN_FORMAT_ARGS(flushPtr),
2467 LSN_FORMAT_ARGS(applyPtr),
2468 replyRequested ? " (reply requested)" : "",
2469 replyTimeStr);
2470
2471 pfree(replyTimeStr);
2472 }
2473
2474 /* See if we can compute the round-trip lag for these positions. */
2476 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2477 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2478 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2479
2480 /*
2481 * If the standby reports that it has fully replayed the WAL in two
2482 * consecutive reply messages, then the second such message must result
2483 * from wal_receiver_status_interval expiring on the standby. This is a
2484 * convenient time to forget the lag times measured when it last
2485 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2486 * until more WAL traffic arrives.
2487 */
2488 clearLagTimes = false;
2489 if (applyPtr == sentPtr)
2490 {
2491 if (fullyAppliedLastTime)
2492 clearLagTimes = true;
2493 fullyAppliedLastTime = true;
2494 }
2495 else
2496 fullyAppliedLastTime = false;
2497
2498 /* Send a reply if the standby requested one. */
2499 if (replyRequested)
2501
2502 /*
2503 * Update shared state for this WalSender process based on reply data from
2504 * standby.
2505 */
2506 {
2507 WalSnd *walsnd = MyWalSnd;
2508
2509 SpinLockAcquire(&walsnd->mutex);
2510 walsnd->write = writePtr;
2511 walsnd->flush = flushPtr;
2512 walsnd->apply = applyPtr;
2513 if (writeLag != -1 || clearLagTimes)
2514 walsnd->writeLag = writeLag;
2515 if (flushLag != -1 || clearLagTimes)
2516 walsnd->flushLag = flushLag;
2517 if (applyLag != -1 || clearLagTimes)
2518 walsnd->applyLag = applyLag;
2519 walsnd->replyTime = replyTime;
2520 SpinLockRelease(&walsnd->mutex);
2521 }
2522
2525
2526 /*
2527 * Advance our local xmin horizon when the client confirmed a flush.
2528 */
2529 if (MyReplicationSlot && XLogRecPtrIsValid(flushPtr))
2530 {
2533 else
2535 }
2536}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1811
#define SlotIsLogical(slot)
Definition: slot.h:285
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:173
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2402
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4168
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4287

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 482 of file walsender.c.

483{
484#define READ_REPLICATION_SLOT_COLS 3
485 ReplicationSlot *slot;
487 TupOutputState *tstate;
488 TupleDesc tupdesc;
490 bool nulls[READ_REPLICATION_SLOT_COLS];
491
493 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
494 TEXTOID, -1, 0);
495 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
496 TEXTOID, -1, 0);
497 /* TimeLineID is unsigned, so int4 is not wide enough. */
498 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
499 INT8OID, -1, 0);
500
501 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
502
503 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
504 slot = SearchNamedReplicationSlot(cmd->slotname, false);
505 if (slot == NULL || !slot->in_use)
506 {
507 LWLockRelease(ReplicationSlotControlLock);
508 }
509 else
510 {
511 ReplicationSlot slot_contents;
512 int i = 0;
513
514 /* Copy slot contents while holding spinlock */
515 SpinLockAcquire(&slot->mutex);
516 slot_contents = *slot;
517 SpinLockRelease(&slot->mutex);
518 LWLockRelease(ReplicationSlotControlLock);
519
520 if (OidIsValid(slot_contents.data.database))
522 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
523 errmsg("cannot use %s with a logical replication slot",
524 "READ_REPLICATION_SLOT"));
525
526 /* slot type */
527 values[i] = CStringGetTextDatum("physical");
528 nulls[i] = false;
529 i++;
530
531 /* start LSN */
532 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
533 {
534 char xloc[64];
535
536 snprintf(xloc, sizeof(xloc), "%X/%08X",
537 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
539 nulls[i] = false;
540 }
541 i++;
542
543 /* timeline this WAL was produced on */
544 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
545 {
546 TimeLineID slots_position_timeline;
547 TimeLineID current_timeline;
548 List *timeline_history = NIL;
549
550 /*
551 * While in recovery, use as timeline the currently-replaying one
552 * to get the LSN position's history.
553 */
554 if (RecoveryInProgress())
555 (void) GetXLogReplayRecPtr(&current_timeline);
556 else
557 current_timeline = GetWALInsertionTimeLine();
558
559 timeline_history = readTimeLineHistory(current_timeline);
560 slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
561 timeline_history);
562 values[i] = Int64GetDatum((int64) slots_position_timeline);
563 nulls[i] = false;
564 }
565 i++;
566
568 }
569
571 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
572 do_tup_output(tstate, values, nulls);
573 end_tup_output(tstate);
574}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
#define OidIsValid(objectId)
Definition: c.h:794
@ LW_SHARED
Definition: lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:540
Definition: pg_list.h:54
bool in_use
Definition: slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 581 of file walsender.c.

582{
584 TupleDesc tupdesc;
586 char histfname[MAXFNAMELEN];
587 char path[MAXPGPATH];
588 int fd;
589 off_t histfilelen;
590 off_t bytesleft;
591 Size len;
592
594
595 /*
596 * Reply with a result set with one row, and two columns. The first col is
597 * the name of the history file, 2nd is the contents.
598 */
599 tupdesc = CreateTemplateTupleDesc(2);
600 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
601 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
602
603 TLHistoryFileName(histfname, cmd->timeline);
604 TLHistoryFilePath(path, cmd->timeline);
605
606 /* Send a RowDescription message */
607 dest->rStartup(dest, CMD_SELECT, tupdesc);
608
609 /* Send a DataRow message */
611 pq_sendint16(&buf, 2); /* # of columns */
612 len = strlen(histfname);
613 pq_sendint32(&buf, len); /* col1 len */
614 pq_sendbytes(&buf, histfname, len);
615
616 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
617 if (fd < 0)
620 errmsg("could not open file \"%s\": %m", path)));
621
622 /* Determine file length and send it to client */
623 histfilelen = lseek(fd, 0, SEEK_END);
624 if (histfilelen < 0)
627 errmsg("could not seek to end of file \"%s\": %m", path)));
628 if (lseek(fd, 0, SEEK_SET) != 0)
631 errmsg("could not seek to beginning of file \"%s\": %m", path)));
632
633 pq_sendint32(&buf, histfilelen); /* col2 len */
634
635 bytesleft = histfilelen;
636 while (bytesleft > 0)
637 {
638 PGAlignedBlock rbuf;
639 int nread;
640
641 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
642 nread = read(fd, rbuf.data, sizeof(rbuf));
644 if (nread < 0)
647 errmsg("could not read file \"%s\": %m",
648 path)));
649 else if (nread == 0)
652 errmsg("could not read file \"%s\": read %d of %zu",
653 path, nread, (Size) bytesleft)));
654
655 pq_sendbytes(&buf, rbuf.data, nread);
656 bytesleft -= nread;
657 }
658
659 if (CloseTransientFile(fd) != 0)
662 errmsg("could not close file \"%s\": %m", path)));
663
665}
#define PG_BINARY
Definition: c.h:1258
size_t Size
Definition: c.h:625
int errcode_for_file_access(void)
Definition: elog.c:886
int CloseTransientFile(int fd)
Definition: fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2674
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:275
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
char data[BLCKSZ]
Definition: c.h:1116
TimeLineID timeline
Definition: replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1458 of file walsender.c.

1459{
1461 QueryCompletion qc;
1462
1463 /* make sure that our requirements are still fulfilled */
1465
1467
1468 ReplicationSlotAcquire(cmd->slotname, true, true);
1469
1470 /*
1471 * Force a disconnect, so that the decoding code doesn't need to care
1472 * about an eventual switch from running in recovery, to running in a
1473 * normal environment. Client code is expected to handle reconnects.
1474 */
1476 {
1477 ereport(LOG,
1478 (errmsg("terminating walsender process after promotion")));
1479 got_STOPPING = true;
1480 }
1481
1482 /*
1483 * Create our decoding context, making it start at the previously ack'ed
1484 * position.
1485 *
1486 * Do this before sending a CopyBothResponse message, so that any errors
1487 * are reported early.
1488 */
1490 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1492 .segment_open = WalSndSegmentOpen,
1493 .segment_close = wal_segment_close),
1497
1499
1500 /* Send a CopyBothResponse message, and start streaming */
1502 pq_sendbyte(&buf, 0);
1503 pq_sendint16(&buf, 0);
1505 pq_flush();
1506
1507 /* Start reading WAL from the oldest required WAL. */
1510
1511 /*
1512 * Report the location after which we'll send out further commits as the
1513 * current sentPtr.
1514 */
1516
1517 /* Also update the sent position status in shared memory */
1521
1522 replication_active = true;
1523
1525
1526 /* Main loop of walsender */
1528
1531
1532 replication_active = false;
1533 if (got_STOPPING)
1534 proc_exit(0);
1536
1537 /* Get out of COPY mode (CommandComplete). */
1538 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1539 EndCommand(&qc, DestRemote, false);
1540}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:489
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:620
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2880
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:216
static void XLogSendLogical(void)
Definition: walsender.c:3502
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:232

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 813 of file walsender.c.

814{
816 XLogRecPtr FlushPtr;
817 TimeLineID FlushTLI;
818
819 /* create xlogreader for physical replication */
820 xlogreader =
822 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
823 .segment_close = wal_segment_close),
824 NULL);
825
826 if (!xlogreader)
828 (errcode(ERRCODE_OUT_OF_MEMORY),
829 errmsg("out of memory"),
830 errdetail("Failed while allocating a WAL reading processor.")));
831
832 /*
833 * We assume here that we're logging enough information in the WAL for
834 * log-shipping, since this is checked in PostmasterMain().
835 *
836 * NOTE: wal_level can only change at shutdown, so in most cases it is
837 * difficult for there to be WAL data that we can still see that was
838 * written at wal_level='minimal'.
839 */
840
841 if (cmd->slotname)
842 {
843 ReplicationSlotAcquire(cmd->slotname, true, true);
846 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
847 errmsg("cannot use a logical replication slot for physical replication")));
848
849 /*
850 * We don't need to verify the slot's restart_lsn here; instead we
851 * rely on the caller requesting the starting point to use. If the
852 * WAL segment doesn't exist, we'll fail later.
853 */
854 }
855
856 /*
857 * Select the timeline. If it was given explicitly by the client, use
858 * that. Otherwise use the timeline of the last replayed record.
859 */
862 FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
863 else
864 FlushPtr = GetFlushRecPtr(&FlushTLI);
865
866 if (cmd->timeline != 0)
867 {
868 XLogRecPtr switchpoint;
869
870 sendTimeLine = cmd->timeline;
871 if (sendTimeLine == FlushTLI)
872 {
875 }
876 else
877 {
878 List *timeLineHistory;
879
881
882 /*
883 * Check that the timeline the client requested exists, and the
884 * requested start location is on that timeline.
885 */
886 timeLineHistory = readTimeLineHistory(FlushTLI);
887 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
889 list_free_deep(timeLineHistory);
890
891 /*
892 * Found the requested timeline in the history. Check that
893 * requested startpoint is on that timeline in our history.
894 *
895 * This is quite loose on purpose. We only check that we didn't
896 * fork off the requested timeline before the switchpoint. We
897 * don't check that we switched *to* it before the requested
898 * starting point. This is because the client can legitimately
899 * request to start replication from the beginning of the WAL
900 * segment that contains switchpoint, but on the new timeline, so
901 * that it doesn't end up with a partial segment. If you ask for
902 * too old a starting point, you'll get an error later when we
903 * fail to find the requested WAL segment in pg_wal.
904 *
905 * XXX: we could be more strict here and only allow a startpoint
906 * that's older than the switchpoint, if it's still in the same
907 * WAL segment.
908 */
909 if (XLogRecPtrIsValid(switchpoint) &&
910 switchpoint < cmd->startpoint)
911 {
913 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
915 cmd->timeline),
916 errdetail("This server's history forked from timeline %u at %X/%08X.",
917 cmd->timeline,
918 LSN_FORMAT_ARGS(switchpoint)));
919 }
920 sendTimeLineValidUpto = switchpoint;
921 }
922 }
923 else
924 {
925 sendTimeLine = FlushTLI;
928 }
929
931
932 /* If there is nothing to stream, don't even enter COPY mode */
934 {
935 /*
936 * When we first start replication the standby will be behind the
937 * primary. For some applications, for example synchronous
938 * replication, it is important to have a clear state for this initial
939 * catchup mode, so we can trigger actions when we change streaming
940 * state later. We may stay in this state for a long time, which is
941 * exactly why we want to be able to monitor whether or not we are
942 * still here.
943 */
945
946 /* Send a CopyBothResponse message, and start streaming */
948 pq_sendbyte(&buf, 0);
949 pq_sendint16(&buf, 0);
951 pq_flush();
952
953 /*
954 * Don't allow a request to stream from a future point in WAL that
955 * hasn't been flushed to disk in this server yet.
956 */
957 if (FlushPtr < cmd->startpoint)
958 {
960 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
962 LSN_FORMAT_ARGS(FlushPtr)));
963 }
964
965 /* Start streaming from the requested point */
966 sentPtr = cmd->startpoint;
967
968 /* Initialize shared memory status, too */
972
974
975 /* Main loop of walsender */
976 replication_active = true;
977
979
980 replication_active = false;
981 if (got_STOPPING)
982 proc_exit(0);
984
986 }
987
988 if (cmd->slotname)
990
991 /*
992 * Copy is finished now. Send a single-row result set indicating the next
993 * timeline.
994 */
996 {
997 char startpos_str[8 + 1 + 8 + 1];
999 TupOutputState *tstate;
1000 TupleDesc tupdesc;
1001 Datum values[2];
1002 bool nulls[2] = {0};
1003
1004 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1006
1008
1009 /*
1010 * Need a tuple descriptor representing two columns. int8 may seem
1011 * like a surprising data type for this, but in theory int4 would not
1012 * be wide enough for this, as TimeLineID is unsigned.
1013 */
1014 tupdesc = CreateTemplateTupleDesc(2);
1015 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1016 INT8OID, -1, 0);
1017 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1018 TEXTOID, -1, 0);
1019
1020 /* prepare for projection of tuple */
1021 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1022
1024 values[1] = CStringGetTextDatum(startpos_str);
1025
1026 /* send it to dest */
1027 do_tup_output(tstate, values, nulls);
1028
1029 end_tup_output(tstate);
1030 }
1031
1032 /* Send CommandComplete message */
1033 EndReplicationCommand("START_STREAMING");
1034}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1216
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3192
int wal_segment_size
Definition: xlog.c:146
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:107

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2589 of file walsender.c.

2590{
2591 FullTransactionId nextFullXid;
2592 TransactionId nextXid;
2593 uint32 nextEpoch;
2594
2595 nextFullXid = ReadNextFullTransactionId();
2596 nextXid = XidFromFullTransactionId(nextFullXid);
2597 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2598
2599 if (xid <= nextXid)
2600 {
2601 if (epoch != nextEpoch)
2602 return false;
2603 }
2604 else
2605 {
2606 if (epoch + 1 != nextEpoch)
2607 return false;
2608 }
2609
2610 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2611 return false; /* epoch OK, but it's wrapped around */
2612
2613 return true;
2614}
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 671 of file walsender.c.

672{
673 MemoryContext mcxt;
675 off_t offset = 0;
677
678 /*
679 * parsing the manifest will use the cryptohash stuff, which requires a
680 * resource owner
681 */
684 CurrentResourceOwner == NULL);
686
687 /* Prepare to read manifest data into a temporary context. */
689 "incremental backup information",
692
693 /* Send a CopyInResponse message */
695 pq_sendbyte(&buf, 0);
696 pq_sendint16(&buf, 0);
698 pq_flush();
699
700 /* Receive packets from client until done. */
701 while (HandleUploadManifestPacket(&buf, &offset, ib))
702 ;
703
704 /* Finish up manifest processing. */
706
707 /*
708 * Discard any old manifest information and arrange to preserve the new
709 * information we just got.
710 *
711 * We assume that MemoryContextDelete and MemoryContextSetParent won't
712 * fail, and thus we shouldn't end up bailing out of here in such a way as
713 * to leave dangling pointers.
714 */
715 if (uploaded_manifest_mcxt != NULL)
720
721 /* clean up the resource owner we created */
723}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:686
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:472
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:313
#define PqMsg_CopyInResponse
Definition: protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition: resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:737
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:156

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2853 of file walsender.c.

2854{
2855 TimestampTz timeout;
2856
2857 /* don't bail out if we're doing something that doesn't require timeouts */
2858 if (last_reply_timestamp <= 0)
2859 return;
2860
2863
2864 if (wal_sender_timeout > 0 && last_processing >= timeout)
2865 {
2866 /*
2867 * Since typically expiration of replication timeout means
2868 * communication problem, we don't send the error message to the
2869 * standby.
2870 */
2872 (errmsg("terminating walsender process due to replication timeout")));
2873
2875 }
2876}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:131

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2809 of file walsender.c.

2810{
2811 long sleeptime = 10000; /* 10 s */
2812
2814 {
2815 TimestampTz wakeup_time;
2816
2817 /*
2818 * At the latest stop sleeping once wal_sender_timeout has been
2819 * reached.
2820 */
2823
2824 /*
2825 * If no ping has been sent yet, wakeup when it's time to do so.
2826 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2827 * the timeout passed without a response.
2828 */
2831 wal_sender_timeout / 2);
2832
2833 /* Compute relative time until wakeup. */
2834 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2835 }
2836
2837 return sleeptime;
2838}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3595 of file walsender.c.

3596{
3597 XLogRecPtr replicatedPtr;
3598
3599 /* ... let's just be real sure we're caught up ... */
3600 send_data();
3601
3602 /*
3603 * To figure out whether all WAL has successfully been replicated, check
3604 * flush location if valid, write otherwise. Tools like pg_receivewal will
3605 * usually (unless in synchronous mode) return an invalid flush location.
3606 */
3607 replicatedPtr = XLogRecPtrIsValid(MyWalSnd->flush) ?
3609
3610 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3612 {
3613 QueryCompletion qc;
3614
3615 /* Inform the standby that XLOG streaming is done */
3616 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3617 EndCommand(&qc, DestRemote, false);
3618 pq_flush();
3619
3620 proc_exit(0);
3621 }
3624}
static bool WalSndCaughtUp
Definition: walsender.c:202

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition: aio.c:1165
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1949
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:857
WALOpenSegment seg
Definition: xlogreader.h:271
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:205
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5011

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3962 of file walsender.c.

3963{
3964 switch (state)
3965 {
3967 return "startup";
3968 case WALSNDSTATE_BACKUP:
3969 return "backup";
3971 return "catchup";
3973 return "streaming";
3975 return "stopping";
3976 }
3977 return "UNKNOWN";
3978}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3879 of file walsender.c.

3880{
3881 int i;
3882
3883 for (i = 0; i < max_wal_senders; i++)
3884 {
3885 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3886 pid_t pid;
3887
3888 SpinLockAcquire(&walsnd->mutex);
3889 pid = walsnd->pid;
3890 SpinLockRelease(&walsnd->mutex);
3891
3892 if (pid == 0)
3893 continue;
3894
3896 }
3897}
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4168 of file walsender.c.

4169{
4170 elog(DEBUG2, "sending replication keepalive");
4171
4172 /* construct the message... */
4175 pq_sendint64(&output_message, XLogRecPtrIsValid(writePtr) ? writePtr : sentPtr);
4177 pq_sendbyte(&output_message, requestReply ? 1 : 0);
4178
4179 /* ... and send it wrapped in CopyData */
4181
4182 /* Set local flag */
4183 if (requestReply)
4185}
#define PqReplMsg_Keepalive
Definition: protocol.h:75

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_Keepalive, resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4191 of file walsender.c.

4192{
4193 TimestampTz ping_time;
4194
4195 /*
4196 * Don't send keepalive messages if timeouts are globally disabled or
4197 * we're doing something not partaking in timeouts.
4198 */
4200 return;
4201
4203 return;
4204
4205 /*
4206 * If half of wal_sender_timeout has lapsed without receiving any reply
4207 * from the standby, send a keep-alive message to the standby requesting
4208 * an immediate reply.
4209 */
4211 wal_sender_timeout / 2);
4212 if (last_processing >= ping_time)
4213 {
4215
4216 /* Try to flush pending output to the client */
4217 if (pq_flush_if_writable() != 0)
4219 }
4220}

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3098 of file walsender.c.

3099{
3100 WalSnd *walsnd = MyWalSnd;
3101
3102 Assert(walsnd != NULL);
3103
3104 MyWalSnd = NULL;
3105
3106 SpinLockAcquire(&walsnd->mutex);
3107 /* Mark WalSnd struct as no longer being in use. */
3108 walsnd->pid = 0;
3109 SpinLockRelease(&walsnd->mutex);
3110}

References Assert(), WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3716 of file walsender.c.

3717{
3718 got_SIGUSR2 = true;
3720}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2880 of file walsender.c.

2881{
2882 TimestampTz last_flush = 0;
2883
2884 /*
2885 * Initialize the last reply timestamp. That enables timeout processing
2886 * from hereon.
2887 */
2890
2891 /*
2892 * Loop until we reach the end of this timeline or the client requests to
2893 * stop streaming.
2894 */
2895 for (;;)
2896 {
2897 /* Clear any already-pending wakeups */
2899
2901
2902 /* Process any requests or signals received recently */
2904 {
2905 ConfigReloadPending = false;
2908 }
2909
2910 /* Check for input from the client */
2912
2913 /*
2914 * If we have received CopyDone from the client, sent CopyDone
2915 * ourselves, and the output buffer is empty, it's time to exit
2916 * streaming.
2917 */
2920 break;
2921
2922 /*
2923 * If we don't have any pending data in the output buffer, try to send
2924 * some more. If there is some, we don't bother to call send_data
2925 * again until we've flushed it ... but we'd better assume we are not
2926 * caught up.
2927 */
2928 if (!pq_is_send_pending())
2929 send_data();
2930 else
2931 WalSndCaughtUp = false;
2932
2933 /* Try to flush pending output to the client */
2934 if (pq_flush_if_writable() != 0)
2936
2937 /* If nothing remains to be sent right now ... */
2939 {
2940 /*
2941 * If we're in catchup state, move to streaming. This is an
2942 * important state change for users to know about, since before
2943 * this point data loss might occur if the primary dies and we
2944 * need to failover to the standby. The state change is also
2945 * important for synchronous replication, since commits that
2946 * started to wait at that point might wait for some time.
2947 */
2949 {
2951 (errmsg_internal("\"%s\" has now caught up with upstream server",
2954 }
2955
2956 /*
2957 * When SIGUSR2 arrives, we send any outstanding logs up to the
2958 * shutdown checkpoint record (i.e., the latest record), wait for
2959 * them to be replicated to the standby, and exit. This may be a
2960 * normal termination at shutdown, or a promotion, the walsender
2961 * is not sure which.
2962 */
2963 if (got_SIGUSR2)
2964 WalSndDone(send_data);
2965 }
2966
2967 /* Check for replication timeout. */
2969
2970 /* Send keepalive if the time has come */
2972
2973 /*
2974 * Block if we have unsent data. XXX For logical replication, let
2975 * WalSndWaitForWal() handle any other blocking; idle receivers need
2976 * its additional actions. For physical replication, also block if
2977 * caught up; its send_data does not block.
2978 *
2979 * The IO statistics are reported in WalSndWaitForWal() for the
2980 * logical WAL senders.
2981 */
2982 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2985 {
2986 long sleeptime;
2987 int wakeEvents;
2989
2991 wakeEvents = WL_SOCKET_READABLE;
2992 else
2993 wakeEvents = 0;
2994
2995 /*
2996 * Use fresh timestamp, not last_processing, to reduce the chance
2997 * of reaching wal_sender_timeout before sending a keepalive.
2998 */
3000 sleeptime = WalSndComputeSleeptime(now);
3001
3002 if (pq_is_send_pending())
3003 wakeEvents |= WL_SOCKET_WRITEABLE;
3004
3005 /* Report IO statistics, if needed */
3006 if (TimestampDifferenceExceeds(last_flush, now,
3008 {
3009 pgstat_flush_io(false);
3011 last_flush = now;
3012 }
3013
3014 /* Sleep until something happens or we time out */
3015 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
3016 }
3017 }
3018}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
char * application_name
Definition: guc_tables.c:561
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition: pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition: walsender.c:103
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3595

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1551 of file walsender.c.

1552{
1553 /* can't have sync rep confused by sending the same LSN several times */
1554 if (!last_write)
1555 lsn = InvalidXLogRecPtr;
1556
1557 resetStringInfo(ctx->out);
1558
1560 pq_sendint64(ctx->out, lsn); /* dataStart */
1561 pq_sendint64(ctx->out, lsn); /* walEnd */
1562
1563 /*
1564 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1565 * reserve space here.
1566 */
1567 pq_sendint64(ctx->out, 0); /* sendtime */
1568}
#define PqReplMsg_WALData
Definition: protocol.h:77
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3671 of file walsender.c.

3672{
3673 int i;
3674
3675 for (i = 0; i < max_wal_senders; i++)
3676 {
3677 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3678
3679 SpinLockAcquire(&walsnd->mutex);
3680 if (walsnd->pid == 0)
3681 {
3682 SpinLockRelease(&walsnd->mutex);
3683 continue;
3684 }
3685 walsnd->needreload = true;
3686 SpinLockRelease(&walsnd->mutex);
3687 }
3688}

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3114 of file walsender.c.

3116{
3117 char path[MAXPGPATH];
3118
3119 /*-------
3120 * When reading from a historic timeline, and there is a timeline switch
3121 * within this segment, read from the WAL segment belonging to the new
3122 * timeline.
3123 *
3124 * For example, imagine that this server is currently on timeline 5, and
3125 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3126 * 0/13002088. In pg_wal, we have these files:
3127 *
3128 * ...
3129 * 000000040000000000000012
3130 * 000000040000000000000013
3131 * 000000050000000000000013
3132 * 000000050000000000000014
3133 * ...
3134 *
3135 * In this situation, when requested to send the WAL from segment 0x13, on
3136 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3137 * recovery prefers files from newer timelines, so if the segment was
3138 * restored from the archive on this server, the file belonging to the old
3139 * timeline, 000000040000000000000013, might not exist. Their contents are
3140 * equal up to the switchpoint, because at a timeline switch, the used
3141 * portion of the old segment is copied to the new file.
3142 */
3143 *tli_p = sendTimeLine;
3145 {
3146 XLogSegNo endSegNo;
3147
3148 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3149 if (nextSegNo == endSegNo)
3150 *tli_p = sendTimeLineNextTLI;
3151 }
3152
3153 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3154 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3155 if (state->seg.ws_file >= 0)
3156 return;
3157
3158 /*
3159 * If the file is not found, assume it's because the standby asked for a
3160 * too old WAL segment that has already been removed or recycled.
3161 */
3162 if (errno == ENOENT)
3163 {
3164 char xlogfname[MAXFNAMELEN];
3165 int save_errno = errno;
3166
3167 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3168 errno = save_errno;
3169 ereport(ERROR,
3171 errmsg("requested WAL segment %s has already been removed",
3172 xlogfname)));
3173 }
3174 else
3175 ereport(ERROR,
3177 errmsg("could not open file \"%s\": %m",
3178 path)));
3179}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1086
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3943 of file walsender.c.

3944{
3945 WalSnd *walsnd = MyWalSnd;
3946
3948
3949 if (walsnd->state == state)
3950 return;
3951
3952 SpinLockAcquire(&walsnd->mutex);
3953 walsnd->state = state;
3954 SpinLockRelease(&walsnd->mutex);
3955}

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3755 of file walsender.c.

3756{
3757 bool found;
3758 int i;
3759
3761 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3762
3763 if (!found)
3764 {
3765 /* First time through, so initialize */
3767
3768 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3770
3771 for (i = 0; i < max_wal_senders; i++)
3772 {
3773 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3774
3775 SpinLockInit(&walsnd->mutex);
3776 }
3777
3781 }
3782}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3743

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3743 of file walsender.c.

3744{
3745 Size size = 0;
3746
3747 size = offsetof(WalSndCtlData, walsnds);
3748 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3749
3750 return size;
3751}
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 384 of file walsender.c.

385{
386 /*
387 * Reset whereToSendOutput to prevent ereport from attempting to send any
388 * more messages to the standby.
389 */
392
393 proc_exit(0);
394 abort(); /* keep the compiler quiet */
395}
@ DestNone
Definition: dest.h:87
CommandDest whereToSendOutput
Definition: postgres.c:92

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3724 of file walsender.c.

3725{
3726 /* Set up signal handlers */
3728 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3729 pqsignal(SIGTERM, die); /* request shutdown */
3730 /* SIGQUIT handler was already set up by InitPostmasterChild */
3731 InitializeTimeouts(); /* establishes SIGALRM handler */
3732 pqsignal(SIGPIPE, SIG_IGN);
3734 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3735 * shutdown */
3736
3737 /* Reset some signals that are accepted by postmaster but not here */
3738 pqsignal(SIGCHLD, SIG_DFL);
3739}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:99
#define pqsignal
Definition: port.h:551
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3062
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:677
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3716
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1674 of file walsender.c.

1676{
1677 static TimestampTz sendTime = 0;
1679 bool pending_writes = false;
1680 bool end_xact = ctx->end_xact;
1681
1682 /*
1683 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1684 * avoid flooding the lag tracker when we commit frequently.
1685 *
1686 * We don't have a mechanism to get the ack for any LSN other than end
1687 * xact LSN from the downstream. So, we track lag only for end of
1688 * transaction LSN.
1689 */
1690#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1691 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1693 {
1694 LagTrackerWrite(lsn, now);
1695 sendTime = now;
1696 }
1697
1698 /*
1699 * When skipping empty transactions in synchronous replication, we send a
1700 * keepalive message to avoid delaying such transactions.
1701 *
1702 * It is okay to check sync_standbys_status without lock here as in the
1703 * worst case we will just send an extra keepalive message when it is
1704 * really not required.
1705 */
1706 if (skipped_xact &&
1707 SyncRepRequested() &&
1708 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1709 {
1710 WalSndKeepalive(false, lsn);
1711
1712 /* Try to flush pending output to the client */
1713 if (pq_flush_if_writable() != 0)
1715
1716 /* If we have pending write here, make sure it's actually flushed */
1717 if (pq_is_send_pending())
1718 pending_writes = true;
1719 }
1720
1721 /*
1722 * Process pending writes if any or try to send a keepalive if required.
1723 * We don't need to try sending keep alive messages at the transaction end
1724 * as that will be done at a later point in time. This is required only
1725 * for large transactions where we don't send any changes to the
1726 * downstream and the receiver can timeout due to that.
1727 */
1728 if (pending_writes || (!end_xact &&
1730 wal_sender_timeout / 2)))
1732}
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1620
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4229
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3821 of file walsender.c.

3822{
3823 WaitEvent event;
3824
3825 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3826
3827 /*
3828 * We use a condition variable to efficiently wake up walsenders in
3829 * WalSndWakeup().
3830 *
3831 * Every walsender prepares to sleep on a shared memory CV. Note that it
3832 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3833 * waitlist), but does not actually wait on the CV (IOW, it never calls
3834 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3835 * waiting, because we also need to wait for socket events. The processes
3836 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3837 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3838 * walsenders come out of WaitEventSetWait().
3839 *
3840 * This approach is simple and efficient because, one doesn't have to loop
3841 * through all the walsenders slots, with a spinlock acquisition and
3842 * release for every iteration, just to wake up only the waiting
3843 * walsenders. It makes WalSndWakeup() callers' life easy.
3844 *
3845 * XXX: A desirable future improvement would be to add support for CVs
3846 * into WaitEventSetWait().
3847 *
3848 * And, we use separate shared memory CVs for physical and logical
3849 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3850 *
3851 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3852 * until awakened by physical walsenders after the walreceiver confirms
3853 * the receipt of the LSN.
3854 */
3855 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3861
3862 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3863 (event.events & WL_POSTMASTER_DEATH))
3864 {
3866 proc_exit(1);
3867 }
3868
3870}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: waiteventset.h:62
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: waiteventset.c:655
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1824 of file walsender.c.

1825{
1826 int wakeEvents;
1827 uint32 wait_event = 0;
1828 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1829 TimestampTz last_flush = 0;
1830
1831 /*
1832 * Fast path to avoid acquiring the spinlock in case we already know we
1833 * have enough WAL available and all the standby servers have confirmed
1834 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1835 * if we're far behind.
1836 */
1837 if (XLogRecPtrIsValid(RecentFlushPtr) &&
1838 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1839 return RecentFlushPtr;
1840
1841 /*
1842 * Within the loop, we wait for the necessary WALs to be flushed to disk
1843 * first, followed by waiting for standbys to catch up if there are enough
1844 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1845 */
1846 for (;;)
1847 {
1848 bool wait_for_standby_at_stop = false;
1849 long sleeptime;
1851
1852 /* Clear any already-pending wakeups */
1854
1856
1857 /* Process any requests or signals received recently */
1859 {
1860 ConfigReloadPending = false;
1863 }
1864
1865 /* Check for input from the client */
1867
1868 /*
1869 * If we're shutting down, trigger pending WAL to be written out,
1870 * otherwise we'd possibly end up waiting for WAL that never gets
1871 * written, because walwriter has shut down already.
1872 */
1873 if (got_STOPPING)
1875
1876 /*
1877 * To avoid the scenario where standbys need to catch up to a newer
1878 * WAL location in each iteration, we update our idea of the currently
1879 * flushed position only if we are not waiting for standbys to catch
1880 * up.
1881 */
1882 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1883 {
1884 if (!RecoveryInProgress())
1885 RecentFlushPtr = GetFlushRecPtr(NULL);
1886 else
1887 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1888 }
1889
1890 /*
1891 * If postmaster asked us to stop and the standby slots have caught up
1892 * to the flushed position, don't wait anymore.
1893 *
1894 * It's important to do this check after the recomputation of
1895 * RecentFlushPtr, so we can send all remaining data before shutting
1896 * down.
1897 */
1898 if (got_STOPPING)
1899 {
1900 if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1901 wait_for_standby_at_stop = true;
1902 else
1903 break;
1904 }
1905
1906 /*
1907 * We only send regular messages to the client for full decoded
1908 * transactions, but a synchronous replication and walsender shutdown
1909 * possibly are waiting for a later location. So, before sleeping, we
1910 * send a ping containing the flush location. If the receiver is
1911 * otherwise idle, this keepalive will trigger a reply. Processing the
1912 * reply will update these MyWalSnd locations.
1913 */
1914 if (MyWalSnd->flush < sentPtr &&
1915 MyWalSnd->write < sentPtr &&
1918
1919 /*
1920 * Exit the loop if already caught up and doesn't need to wait for
1921 * standby slots.
1922 */
1923 if (!wait_for_standby_at_stop &&
1924 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1925 break;
1926
1927 /*
1928 * Waiting for new WAL or waiting for standbys to catch up. Since we
1929 * need to wait, we're now caught up.
1930 */
1931 WalSndCaughtUp = true;
1932
1933 /*
1934 * Try to flush any pending output to the client.
1935 */
1936 if (pq_flush_if_writable() != 0)
1938
1939 /*
1940 * If we have received CopyDone from the client, sent CopyDone
1941 * ourselves, and the output buffer is empty, it's time to exit
1942 * streaming, so fail the current WAL fetch request.
1943 */
1946 break;
1947
1948 /* die if timeout was reached */
1950
1951 /* Send keepalive if the time has come */
1953
1954 /*
1955 * Sleep until something happens or we time out. Also wait for the
1956 * socket becoming writable, if there's still pending output.
1957 * Otherwise we might sit on sendable output data while waiting for
1958 * new WAL to be generated. (But if we have nothing to send, we don't
1959 * want to wake on socket-writable.)
1960 */
1962 sleeptime = WalSndComputeSleeptime(now);
1963
1964 wakeEvents = WL_SOCKET_READABLE;
1965
1966 if (pq_is_send_pending())
1967 wakeEvents |= WL_SOCKET_WRITEABLE;
1968
1969 Assert(wait_event != 0);
1970
1971 /* Report IO statistics, if needed */
1972 if (TimestampDifferenceExceeds(last_flush, now,
1974 {
1975 pgstat_flush_io(false);
1977 last_flush = now;
1978 }
1979
1980 WalSndWait(wakeEvents, sleeptime, wait_event);
1981 }
1982
1983 /* reactivate latch so WalSndLoop knows to continue */
1985 return RecentFlushPtr;
1986}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1796
bool XLogBackgroundFlush(void)
Definition: xlog.c:2989

References Assert(), CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3905 of file walsender.c.

3906{
3907 for (;;)
3908 {
3909 int i;
3910 bool all_stopped = true;
3911
3912 for (i = 0; i < max_wal_senders; i++)
3913 {
3914 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3915
3916 SpinLockAcquire(&walsnd->mutex);
3917
3918 if (walsnd->pid == 0)
3919 {
3920 SpinLockRelease(&walsnd->mutex);
3921 continue;
3922 }
3923
3924 if (walsnd->state != WALSNDSTATE_STOPPING)
3925 {
3926 all_stopped = false;
3927 SpinLockRelease(&walsnd->mutex);
3928 break;
3929 }
3930 SpinLockRelease(&walsnd->mutex);
3931 }
3932
3933 /* safe to leave if confirmation is done for all WAL senders */
3934 if (all_stopped)
3935 return;
3936
3937 pg_usleep(10000L); /* wait for 10 msec */
3938 }
3939}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3800 of file walsender.c.

3801{
3802 /*
3803 * Wake up all the walsenders waiting on WAL being flushed or replayed
3804 * respectively. Note that waiting walsender would have prepared to sleep
3805 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3806 * before actually waiting.
3807 */
3808 if (physical)
3810
3811 if (logical)
3813}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1578 of file walsender.c.

1580{
1582
1583 /*
1584 * Fill the send timestamp last, so that it is taken as late as possible.
1585 * This is somewhat ugly, but the protocol is set as it's already used for
1586 * several releases by streaming physical replication.
1587 */
1591 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1592 tmpbuf.data, sizeof(int64));
1593
1594 /* output previously gathered data in a CopyData packet */
1596
1598
1599 /* Try to flush pending output to the client */
1600 if (pq_flush_if_writable() != 0)
1602
1603 /* Try taking fast path unless we get too close to walsender timeout. */
1605 wal_sender_timeout / 2) &&
1607 {
1608 return;
1609 }
1610
1611 /* If we have pending write here, go to slow path */
1613}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3502 of file walsender.c.

3503{
3504 XLogRecord *record;
3505 char *errm;
3506
3507 /*
3508 * We'll use the current flush point to determine whether we've caught up.
3509 * This variable is static in order to cache it across calls. Caching is
3510 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3511 * spinlock.
3512 */
3513 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3514
3515 /*
3516 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3517 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3518 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3519 * didn't wait - i.e. when we're shutting down.
3520 */
3521 WalSndCaughtUp = false;
3522
3523 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3524
3525 /* xlog record was invalid */
3526 if (errm != NULL)
3527 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3528 errm);
3529
3530 if (record != NULL)
3531 {
3532 /*
3533 * Note the lack of any call to LagTrackerWrite() which is handled by
3534 * WalSndUpdateProgress which is called by output plugin through
3535 * logical decoding write api.
3536 */
3538
3540 }
3541
3542 /*
3543 * If first time through in this session, initialize flushPtr. Otherwise,
3544 * we only need to update flushPtr if EndRecPtr is past it.
3545 */
3546 if (!XLogRecPtrIsValid(flushPtr) ||
3548 {
3549 /*
3550 * For cascading logical WAL senders, we use the replay LSN instead of
3551 * the flush LSN, since logical decoding on a standby only processes
3552 * WAL that has been replayed. This distinction becomes particularly
3553 * important during shutdown, as new WAL is no longer replayed and the
3554 * last replayed LSN marks the furthest point up to which decoding can
3555 * proceed.
3556 */
3558 flushPtr = GetXLogReplayRecPtr(NULL);
3559 else
3560 flushPtr = GetFlushRecPtr(NULL);
3561 }
3562
3563 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3564 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3565 WalSndCaughtUp = true;
3566
3567 /*
3568 * If we're caught up and have been requested to stop, have WalSndLoop()
3569 * terminate the connection in an orderly manner, after writing out all
3570 * the pending data.
3571 */
3573 got_SIGUSR2 = true;
3574
3575 /* Update shared memory status */
3576 {
3577 WalSnd *walsnd = MyWalSnd;
3578
3579 SpinLockAcquire(&walsnd->mutex);
3580 walsnd->sentPtr = sentPtr;
3581 SpinLockRelease(&walsnd->mutex);
3582 }
3583}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3192 of file walsender.c.

3193{
3194 XLogRecPtr SendRqstPtr;
3195 XLogRecPtr startptr;
3196 XLogRecPtr endptr;
3197 Size nbytes;
3198 XLogSegNo segno;
3199 WALReadError errinfo;
3200 Size rbytes;
3201
3202 /* If requested switch the WAL sender to the stopping state. */
3203 if (got_STOPPING)
3205
3207 {
3208 WalSndCaughtUp = true;
3209 return;
3210 }
3211
3212 /* Figure out how far we can safely send the WAL. */
3214 {
3215 /*
3216 * Streaming an old timeline that's in this server's history, but is
3217 * not the one we're currently inserting or replaying. It can be
3218 * streamed up to the point where we switched off that timeline.
3219 */
3220 SendRqstPtr = sendTimeLineValidUpto;
3221 }
3222 else if (am_cascading_walsender)
3223 {
3224 TimeLineID SendRqstTLI;
3225
3226 /*
3227 * Streaming the latest timeline on a standby.
3228 *
3229 * Attempt to send all WAL that has already been replayed, so that we
3230 * know it's valid. If we're receiving WAL through streaming
3231 * replication, it's also OK to send any WAL that has been received
3232 * but not replayed.
3233 *
3234 * The timeline we're recovering from can change, or we can be
3235 * promoted. In either case, the current timeline becomes historic. We
3236 * need to detect that so that we don't try to stream past the point
3237 * where we switched to another timeline. We check for promotion or
3238 * timeline switch after calculating FlushPtr, to avoid a race
3239 * condition: if the timeline becomes historic just after we checked
3240 * that it was still current, it's still be OK to stream it up to the
3241 * FlushPtr that was calculated before it became historic.
3242 */
3243 bool becameHistoric = false;
3244
3245 SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3246
3247 if (!RecoveryInProgress())
3248 {
3249 /* We have been promoted. */
3250 SendRqstTLI = GetWALInsertionTimeLine();
3251 am_cascading_walsender = false;
3252 becameHistoric = true;
3253 }
3254 else
3255 {
3256 /*
3257 * Still a cascading standby. But is the timeline we're sending
3258 * still the one recovery is recovering from?
3259 */
3260 if (sendTimeLine != SendRqstTLI)
3261 becameHistoric = true;
3262 }
3263
3264 if (becameHistoric)
3265 {
3266 /*
3267 * The timeline we were sending has become historic. Read the
3268 * timeline history file of the new timeline to see where exactly
3269 * we forked off from the timeline we were sending.
3270 */
3271 List *history;
3272
3273 history = readTimeLineHistory(SendRqstTLI);
3275
3277 list_free_deep(history);
3278
3280
3281 SendRqstPtr = sendTimeLineValidUpto;
3282 }
3283 }
3284 else
3285 {
3286 /*
3287 * Streaming the current timeline on a primary.
3288 *
3289 * Attempt to send all data that's already been written out and
3290 * fsync'd to disk. We cannot go further than what's been written out
3291 * given the current implementation of WALRead(). And in any case
3292 * it's unsafe to send WAL that is not securely down to disk on the
3293 * primary: if the primary subsequently crashes and restarts, standbys
3294 * must not have applied any WAL that got lost on the primary.
3295 */
3296 SendRqstPtr = GetFlushRecPtr(NULL);
3297 }
3298
3299 /*
3300 * Record the current system time as an approximation of the time at which
3301 * this WAL location was written for the purposes of lag tracking.
3302 *
3303 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3304 * is flushed and we could get that time as well as the LSN when we call
3305 * GetFlushRecPtr() above (and likewise for the cascading standby
3306 * equivalent), but rather than putting any new code into the hot WAL path
3307 * it seems good enough to capture the time here. We should reach this
3308 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3309 * may take some time, we read the WAL flush pointer and take the time
3310 * very close to together here so that we'll get a later position if it is
3311 * still moving.
3312 *
3313 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3314 * this gives us a cheap approximation for the WAL flush time for this
3315 * LSN.
3316 *
3317 * Note that the LSN is not necessarily the LSN for the data contained in
3318 * the present message; it's the end of the WAL, which might be further
3319 * ahead. All the lag tracking machinery cares about is finding out when
3320 * that arbitrary LSN is eventually reported as written, flushed and
3321 * applied, so that it can measure the elapsed time.
3322 */
3323 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3324
3325 /*
3326 * If this is a historic timeline and we've reached the point where we
3327 * forked to the next timeline, stop streaming.
3328 *
3329 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3330 * startup process will normally replay all WAL that has been received
3331 * from the primary, before promoting, but if the WAL streaming is
3332 * terminated at a WAL page boundary, the valid portion of the timeline
3333 * might end in the middle of a WAL record. We might've already sent the
3334 * first half of that partial WAL record to the cascading standby, so that
3335 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3336 * replay the partial WAL record either, so it can still follow our
3337 * timeline switch.
3338 */
3340 {
3341 /* close the current file. */
3342 if (xlogreader->seg.ws_file >= 0)
3344
3345 /* Send CopyDone */
3347 streamingDoneSending = true;
3348
3349 WalSndCaughtUp = true;
3350
3351 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3354 return;
3355 }
3356
3357 /* Do we have any work to do? */
3358 Assert(sentPtr <= SendRqstPtr);
3359 if (SendRqstPtr <= sentPtr)
3360 {
3361 WalSndCaughtUp = true;
3362 return;
3363 }
3364
3365 /*
3366 * Figure out how much to send in one message. If there's no more than
3367 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3368 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3369 *
3370 * The rounding is not only for performance reasons. Walreceiver relies on
3371 * the fact that we never split a WAL record across two messages. Since a
3372 * long WAL record is split at page boundary into continuation records,
3373 * page boundary is always a safe cut-off point. We also assume that
3374 * SendRqstPtr never points to the middle of a WAL record.
3375 */
3376 startptr = sentPtr;
3377 endptr = startptr;
3378 endptr += MAX_SEND_SIZE;
3379
3380 /* if we went beyond SendRqstPtr, back off */
3381 if (SendRqstPtr <= endptr)
3382 {
3383 endptr = SendRqstPtr;
3385 WalSndCaughtUp = false;
3386 else
3387 WalSndCaughtUp = true;
3388 }
3389 else
3390 {
3391 /* round down to page boundary. */
3392 endptr -= (endptr % XLOG_BLCKSZ);
3393 WalSndCaughtUp = false;
3394 }
3395
3396 nbytes = endptr - startptr;
3397 Assert(nbytes <= MAX_SEND_SIZE);
3398
3399 /*
3400 * OK to read and send the slice.
3401 */
3404
3405 pq_sendint64(&output_message, startptr); /* dataStart */
3406 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3407 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3408
3409 /*
3410 * Read the log directly into the output buffer to avoid extra memcpy
3411 * calls.
3412 */
3414
3415retry:
3416 /* attempt to read WAL from WAL buffers first */
3418 startptr, nbytes, xlogreader->seg.ws_tli);
3419 output_message.len += rbytes;
3420 startptr += rbytes;
3421 nbytes -= rbytes;
3422
3423 /* now read the remaining WAL from WAL file */
3424 if (nbytes > 0 &&
3427 startptr,
3428 nbytes,
3429 xlogreader->seg.ws_tli, /* Pass the current TLI because
3430 * only WalSndSegmentOpen controls
3431 * whether new TLI is needed. */
3432 &errinfo))
3433 WALReadRaiseError(&errinfo);
3434
3435 /* See logical_read_xlog_page(). */
3436 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3438
3439 /*
3440 * During recovery, the currently-open WAL file might be replaced with the
3441 * file of the same name retrieved from archive. So we always need to
3442 * check what we read was valid after reading into the buffer. If it's
3443 * invalid, we try to open and read the file again.
3444 */
3446 {
3447 WalSnd *walsnd = MyWalSnd;
3448 bool reload;
3449
3450 SpinLockAcquire(&walsnd->mutex);
3451 reload = walsnd->needreload;
3452 walsnd->needreload = false;
3453 SpinLockRelease(&walsnd->mutex);
3454
3455 if (reload && xlogreader->seg.ws_file >= 0)
3456 {
3458
3459 goto retry;
3460 }
3461 }
3462
3463 output_message.len += nbytes;
3465
3466 /*
3467 * Fill the send timestamp last, so that it is taken as late as possible.
3468 */
3471 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3472 tmpbuf.data, sizeof(int64));
3473
3475
3476 sentPtr = endptr;
3477
3478 /* Update shared memory status */
3479 {
3480 WalSnd *walsnd = MyWalSnd;
3481
3482 SpinLockAcquire(&walsnd->mutex);
3483 walsnd->sentPtr = sentPtr;
3484 SpinLockRelease(&walsnd->mutex);
3485 }
3486
3487 /* Report progress of XLOG streaming in PS display */
3489 {
3490 char activitymsg[50];
3491
3492 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3494 set_ps_display(activitymsg);
3495 }
3496}
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:270
#define MAX_SEND_SIZE
Definition: walsender.c:114
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1755

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 252 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 216 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 199 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 155 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 156 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

bool waiting_for_ping_response = false
static

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader