PostgreSQL Source Code git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 226 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 114 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 103 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 258 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1409 of file walsender.c.

1410{
1411 bool failover_given = false;
1412 bool two_phase_given = false;
1413 bool failover;
1414 bool two_phase;
1415
1416 /* Parse options */
1417 foreach_ptr(DefElem, defel, cmd->options)
1418 {
1419 if (strcmp(defel->defname, "failover") == 0)
1420 {
1421 if (failover_given)
1422 ereport(ERROR,
1423 (errcode(ERRCODE_SYNTAX_ERROR),
1424 errmsg("conflicting or redundant options")));
1425 failover_given = true;
1426 failover = defGetBoolean(defel);
1427 }
1428 else if (strcmp(defel->defname, "two_phase") == 0)
1429 {
1430 if (two_phase_given)
1431 ereport(ERROR,
1432 (errcode(ERRCODE_SYNTAX_ERROR),
1433 errmsg("conflicting or redundant options")));
1434 two_phase_given = true;
1435 two_phase = defGetBoolean(defel);
1436 }
1437 else
1438 elog(ERROR, "unrecognized option: %s", defel->defname);
1439 }
1440
1442 failover_given ? &failover : NULL,
1443 two_phase_given ? &two_phase : NULL);
1444}
bool defGetBoolean(DefElem *def)
Definition: define.c:94
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:915

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, failover, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1195 of file walsender.c.

1196{
1197 const char *snapshot_name = NULL;
1198 char xloc[MAXFNAMELEN];
1199 char *slot_name;
1200 bool reserve_wal = false;
1201 bool two_phase = false;
1202 bool failover = false;
1203 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1205 TupOutputState *tstate;
1206 TupleDesc tupdesc;
1207 Datum values[4];
1208 bool nulls[4] = {0};
1209
1211
1212 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1213 &failover);
1214
1215 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1216 {
1217 ReplicationSlotCreate(cmd->slotname, false,
1219 false, false, false);
1220
1221 if (reserve_wal)
1222 {
1224
1226
1227 /* Write this slot to disk if it's a permanent one. */
1228 if (!cmd->temporary)
1230 }
1231 }
1232 else
1233 {
1235 bool need_full_snapshot = false;
1236
1238
1240
1241 /*
1242 * Initially create persistent slot as ephemeral - that allows us to
1243 * nicely handle errors during initialization because it'll get
1244 * dropped if this transaction fails. We'll make it persistent at the
1245 * end. Temporary slots can be created as temporary from beginning as
1246 * they get dropped on error as well.
1247 */
1250 two_phase, failover, false);
1251
1252 /*
1253 * Do options check early so that we can bail before calling the
1254 * DecodingContextFindStartpoint which can take long time.
1255 */
1256 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1257 {
1258 if (IsTransactionBlock())
1259 ereport(ERROR,
1260 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1261 (errmsg("%s must not be called inside a transaction",
1262 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1263
1264 need_full_snapshot = true;
1265 }
1266 else if (snapshot_action == CRS_USE_SNAPSHOT)
1267 {
1268 if (!IsTransactionBlock())
1269 ereport(ERROR,
1270 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1271 (errmsg("%s must be called inside a transaction",
1272 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1273
1275 ereport(ERROR,
1276 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1277 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1278 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1279 if (!XactReadOnly)
1280 ereport(ERROR,
1281 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1282 (errmsg("%s must be called in a read-only transaction",
1283 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1284
1285 if (FirstSnapshotSet)
1286 ereport(ERROR,
1287 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1288 (errmsg("%s must be called before any query",
1289 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1290
1291 if (IsSubTransaction())
1292 ereport(ERROR,
1293 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1294 (errmsg("%s must not be called in a subtransaction",
1295 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1296
1297 need_full_snapshot = true;
1298 }
1299
1300 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1303 .segment_open = WalSndSegmentOpen,
1304 .segment_close = wal_segment_close),
1307
1308 /*
1309 * Signal that we don't need the timeout mechanism. We're just
1310 * creating the replication slot and don't yet accept feedback
1311 * messages or send keepalives. As we possibly need to wait for
1312 * further WAL the walsender would otherwise possibly be killed too
1313 * soon.
1314 */
1316
1317 /* build initial snapshot, might take a while */
1319
1320 /*
1321 * Export or use the snapshot if we've been asked to do so.
1322 *
1323 * NB. We will convert the snapbuild.c kind of snapshot to normal
1324 * snapshot when doing this.
1325 */
1326 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1327 {
1328 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1329 }
1330 else if (snapshot_action == CRS_USE_SNAPSHOT)
1331 {
1332 Snapshot snap;
1333
1336 }
1337
1338 /* don't need the decoding context anymore */
1340
1341 if (!cmd->temporary)
1343 }
1344
1345 snprintf(xloc, sizeof(xloc), "%X/%08X",
1347
1349
1350 /*----------
1351 * Need a tuple descriptor representing four columns:
1352 * - first field: the slot name
1353 * - second field: LSN at which we became consistent
1354 * - third field: exported snapshot's name
1355 * - fourth field: output plugin
1356 */
1357 tupdesc = CreateTemplateTupleDesc(4);
1358 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1359 TEXTOID, -1, 0);
1360 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1361 TEXTOID, -1, 0);
1362 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1363 TEXTOID, -1, 0);
1364 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1365 TEXTOID, -1, 0);
1366
1367 /* prepare for projection of tuples */
1368 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1369
1370 /* slot_name */
1371 slot_name = NameStr(MyReplicationSlot->data.name);
1372 values[0] = CStringGetTextDatum(slot_name);
1373
1374 /* consistent wal location */
1375 values[1] = CStringGetTextDatum(xloc);
1376
1377 /* snapshot name, or NULL if none */
1378 if (snapshot_name != NULL)
1379 values[2] = CStringGetTextDatum(snapshot_name);
1380 else
1381 nulls[2] = true;
1382
1383 /* plugin, or NULL if none */
1384 if (cmd->plugin != NULL)
1386 else
1387 nulls[3] = true;
1388
1389 /* send it to dest */
1390 do_tup_output(tstate, values, nulls);
1391 end_tup_output(tstate);
1392
1394}
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:754
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2464
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
Assert(PointerIsAligned(start, uint64))
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:677
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:633
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:332
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:111
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:260
uint64_t Datum
Definition: postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1139
void ReplicationSlotReserveWal(void)
Definition: slot.c:1572
void ReplicationSlotPersist(void)
Definition: slot.c:1156
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
void ReplicationSlotSave(void)
Definition: slot.c:1121
void ReplicationSlotRelease(void)
Definition: slot.c:764
@ RS_PERSISTENT
Definition: slot.h:45
@ RS_EPHEMERAL
Definition: slot.h:46
@ RS_TEMPORARY
Definition: slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
bool FirstSnapshotSet
Definition: snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1856
PGPROC * MyProc
Definition: proc.c:67
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:136
ReplicationSlotPersistentData data
Definition: slot.h:210
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:918
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1118
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:3107
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1571
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1667
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1045
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1544
static TimestampTz last_reply_timestamp
Definition: walsender.c:187
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:83
int XactIsoLevel
Definition: xact.c:80
bool IsSubTransaction(void)
Definition: xact.c:5062
bool IsTransactionBlock(void)
Definition: xact.c:4989
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, failover, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1400 of file walsender.c.

1401{
1402 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1403}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:892

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1988 of file walsender.c.

1989{
1990 yyscan_t scanner;
1991 int parse_rc;
1992 Node *cmd_node;
1993 const char *cmdtag;
1994 MemoryContext old_context = CurrentMemoryContext;
1995
1996 /* We save and re-use the cmd_context across calls */
1997 static MemoryContext cmd_context = NULL;
1998
1999 /*
2000 * If WAL sender has been told that shutdown is getting close, switch its
2001 * status accordingly to handle the next replication commands correctly.
2002 */
2003 if (got_STOPPING)
2005
2006 /*
2007 * Throw error if in stopping mode. We need prevent commands that could
2008 * generate WAL while the shutdown checkpoint is being written. To be
2009 * safe, we just prohibit all new commands.
2010 */
2012 ereport(ERROR,
2013 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2014 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2015
2016 /*
2017 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2018 * command arrives. Clean up the old stuff if there's anything.
2019 */
2021
2023
2024 /*
2025 * Prepare to parse and execute the command.
2026 *
2027 * Because replication command execution can involve beginning or ending
2028 * transactions, we need a working context that will survive that, so we
2029 * make it a child of TopMemoryContext. That in turn creates a hazard of
2030 * long-lived memory leaks if we lose track of the working context. We
2031 * deal with that by creating it only once per walsender, and resetting it
2032 * for each new command. (Normally this reset is a no-op, but if the
2033 * prior exec_replication_command call failed with an error, it won't be.)
2034 *
2035 * This is subtler than it looks. The transactions we manage can extend
2036 * across replication commands, indeed SnapBuildClearExportedSnapshot
2037 * might have just ended one. Because transaction exit will revert to the
2038 * memory context that was current at transaction start, we need to be
2039 * sure that that context is still valid. That motivates re-using the
2040 * same cmd_context rather than making a new one each time.
2041 */
2042 if (cmd_context == NULL)
2044 "Replication command context",
2046 else
2047 MemoryContextReset(cmd_context);
2048
2049 MemoryContextSwitchTo(cmd_context);
2050
2051 replication_scanner_init(cmd_string, &scanner);
2052
2053 /*
2054 * Is it a WalSender command?
2055 */
2057 {
2058 /* Nope; clean up and get out. */
2060
2061 MemoryContextSwitchTo(old_context);
2062 MemoryContextReset(cmd_context);
2063
2064 /* XXX this is a pretty random place to make this check */
2065 if (MyDatabaseId == InvalidOid)
2066 ereport(ERROR,
2067 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2068 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2069
2070 /* Tell the caller that this wasn't a WalSender command. */
2071 return false;
2072 }
2073
2074 /*
2075 * Looks like a WalSender command, so parse it.
2076 */
2077 parse_rc = replication_yyparse(&cmd_node, scanner);
2078 if (parse_rc != 0)
2079 ereport(ERROR,
2080 (errcode(ERRCODE_SYNTAX_ERROR),
2081 errmsg_internal("replication command parser returned %d",
2082 parse_rc)));
2084
2085 /*
2086 * Report query to various monitoring facilities. For this purpose, we
2087 * report replication commands just like SQL commands.
2088 */
2089 debug_query_string = cmd_string;
2090
2092
2093 /*
2094 * Log replication command if log_replication_commands is enabled. Even
2095 * when it's disabled, log the command with DEBUG1 level for backward
2096 * compatibility.
2097 */
2099 (errmsg("received replication command: %s", cmd_string)));
2100
2101 /*
2102 * Disallow replication commands in aborted transaction blocks.
2103 */
2105 ereport(ERROR,
2106 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2107 errmsg("current transaction is aborted, "
2108 "commands ignored until end of transaction block")));
2109
2111
2112 /*
2113 * Allocate buffers that will be used for each outgoing and incoming
2114 * message. We do this just once per command to reduce palloc overhead.
2115 */
2119
2120 switch (cmd_node->type)
2121 {
2122 case T_IdentifySystemCmd:
2123 cmdtag = "IDENTIFY_SYSTEM";
2124 set_ps_display(cmdtag);
2126 EndReplicationCommand(cmdtag);
2127 break;
2128
2129 case T_ReadReplicationSlotCmd:
2130 cmdtag = "READ_REPLICATION_SLOT";
2131 set_ps_display(cmdtag);
2133 EndReplicationCommand(cmdtag);
2134 break;
2135
2136 case T_BaseBackupCmd:
2137 cmdtag = "BASE_BACKUP";
2138 set_ps_display(cmdtag);
2139 PreventInTransactionBlock(true, cmdtag);
2141 EndReplicationCommand(cmdtag);
2142 break;
2143
2144 case T_CreateReplicationSlotCmd:
2145 cmdtag = "CREATE_REPLICATION_SLOT";
2146 set_ps_display(cmdtag);
2148 EndReplicationCommand(cmdtag);
2149 break;
2150
2151 case T_DropReplicationSlotCmd:
2152 cmdtag = "DROP_REPLICATION_SLOT";
2153 set_ps_display(cmdtag);
2155 EndReplicationCommand(cmdtag);
2156 break;
2157
2158 case T_AlterReplicationSlotCmd:
2159 cmdtag = "ALTER_REPLICATION_SLOT";
2160 set_ps_display(cmdtag);
2162 EndReplicationCommand(cmdtag);
2163 break;
2164
2165 case T_StartReplicationCmd:
2166 {
2167 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2168
2169 cmdtag = "START_REPLICATION";
2170 set_ps_display(cmdtag);
2171 PreventInTransactionBlock(true, cmdtag);
2172
2173 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2174 StartReplication(cmd);
2175 else
2177
2178 /* dupe, but necessary per libpqrcv_endstreaming */
2179 EndReplicationCommand(cmdtag);
2180
2181 Assert(xlogreader != NULL);
2182 break;
2183 }
2184
2185 case T_TimeLineHistoryCmd:
2186 cmdtag = "TIMELINE_HISTORY";
2187 set_ps_display(cmdtag);
2188 PreventInTransactionBlock(true, cmdtag);
2190 EndReplicationCommand(cmdtag);
2191 break;
2192
2193 case T_VariableShowStmt:
2194 {
2196 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2197
2198 cmdtag = "SHOW";
2199 set_ps_display(cmdtag);
2200
2201 /* syscache access needs a transaction environment */
2203 GetPGVariable(n->name, dest);
2205 EndReplicationCommand(cmdtag);
2206 }
2207 break;
2208
2209 case T_UploadManifestCmd:
2210 cmdtag = "UPLOAD_MANIFEST";
2211 set_ps_display(cmdtag);
2212 PreventInTransactionBlock(true, cmdtag);
2214 EndReplicationCommand(cmdtag);
2215 break;
2216
2217 default:
2218 elog(ERROR, "unrecognized replication command node tag: %u",
2219 cmd_node->type);
2220 }
2221
2222 /*
2223 * Done. Revert to caller's memory context, and clean out the cmd_context
2224 * to recover memory right away.
2225 */
2226 MemoryContextSwitchTo(old_context);
2227 MemoryContextReset(cmd_context);
2228
2229 /*
2230 * We need not update ps display or pg_stat_activity, because PostgresMain
2231 * will reset those to "idle". But we must reset debug_query_string to
2232 * ensure it doesn't become a dangling pointer.
2233 */
2234 debug_query_string = NULL;
2235
2236 return true;
2237}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:408
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopMemoryContext
Definition: mcxt.c:166
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1409
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:581
WalSnd * MyWalSnd
Definition: walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:482
static StringInfoData tmpbuf
Definition: walsender.c:178
static void IdentifySystem(void)
Definition: walsender.c:401
static StringInfoData reply_message
Definition: walsender.c:177
void WalSndSetState(WalSndState state)
Definition: walsender.c:3936
static StringInfoData output_message
Definition: walsender.c:176
static void UploadManifest(void)
Definition: walsender.c:671
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:206
bool log_replication_commands
Definition: walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1195
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1451
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1400
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:813
static XLogReaderState * xlogreader
Definition: walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3666
void StartTransactionCommand(void)
Definition: xact.c:3077
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:408
void CommitTransactionCommand(void)
Definition: xact.c:3175

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3631 of file walsender.c.

3632{
3633 XLogRecPtr replayPtr;
3634 TimeLineID replayTLI;
3635 XLogRecPtr receivePtr;
3637 XLogRecPtr result;
3638
3640
3641 /*
3642 * We can safely send what's already been replayed. Also, if walreceiver
3643 * is streaming WAL from the same timeline, we can send anything that it
3644 * has streamed, but hasn't been replayed yet.
3645 */
3646
3647 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3648 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3649
3650 if (tli)
3651 *tli = replayTLI;
3652
3653 result = replayPtr;
3654 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3655 result = receivePtr;
3656
3657 return result;
3658}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1754
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63
static TimeLineID receiveTLI
Definition: xlogrecovery.c:266
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 737 of file walsender.c.

739{
740 int mtype;
741 int maxmsglen;
742
744
746 mtype = pq_getbyte();
747 if (mtype == EOF)
749 (errcode(ERRCODE_CONNECTION_FAILURE),
750 errmsg("unexpected EOF on client connection with an open transaction")));
751
752 switch (mtype)
753 {
754 case PqMsg_CopyData:
755 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
756 break;
757 case PqMsg_CopyDone:
758 case PqMsg_CopyFail:
759 case PqMsg_Flush:
760 case PqMsg_Sync:
761 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
762 break;
763 default:
765 (errcode(ERRCODE_PROTOCOL_VIOLATION),
766 errmsg("unexpected message type 0x%02X during COPY from stdin",
767 mtype)));
768 maxmsglen = 0; /* keep compiler quiet */
769 break;
770 }
771
772 /* Now collect the message body */
773 if (pq_getmessage(buf, maxmsglen))
775 (errcode(ERRCODE_CONNECTION_FAILURE),
776 errmsg("unexpected EOF on client connection with an open transaction")));
778
779 /* Process the message */
780 switch (mtype)
781 {
782 case PqMsg_CopyData:
783 AppendIncrementalManifestData(ib, buf->data, buf->len);
784 return true;
785
786 case PqMsg_CopyDone:
787 return false;
788
789 case PqMsg_Sync:
790 case PqMsg_Flush:
791 /* Ignore these while in CopyOut mode as we do elsewhere. */
792 return true;
793
794 case PqMsg_CopyFail:
796 (errcode(ERRCODE_QUERY_CANCELED),
797 errmsg("COPY from stdin failed: %s",
799 }
800
801 /* Not reached. */
802 Assert(false);
803 return false;
804}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:144
static char * buf
Definition: pg_test_fsync.c:72
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1203
int pq_getbyte(void)
Definition: pqcomm.c:963
void pq_startmsgread(void)
Definition: pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Sync
Definition: protocol.h:27
#define PqMsg_CopyFail
Definition: protocol.h:29
#define PqMsg_Flush
Definition: protocol.h:24

References AppendIncrementalManifestData(), Assert(), buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3687 of file walsender.c.

3688{
3690
3691 /*
3692 * If replication has not yet started, die like with SIGTERM. If
3693 * replication is active, only set a flag and wake up the main loop. It
3694 * will send any outstanding WAL, wait for it to be replicated to the
3695 * standby, and then exit gracefully.
3696 */
3697 if (!replication_active)
3698 kill(MyProcPid, SIGTERM);
3699 else
3700 got_STOPPING = true;
3701}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:123
static volatile sig_atomic_t replication_active
Definition: walsender.c:214
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 401 of file walsender.c.

402{
403 char sysid[32];
404 char xloc[MAXFNAMELEN];
405 XLogRecPtr logptr;
406 char *dbname = NULL;
408 TupOutputState *tstate;
409 TupleDesc tupdesc;
410 Datum values[4];
411 bool nulls[4] = {0};
412 TimeLineID currTLI;
413
414 /*
415 * Reply with a result set with one row, four columns. First col is system
416 * ID, second is timeline ID, third is current xlog location and the
417 * fourth contains the database name if we are connected to one.
418 */
419
420 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
422
425 logptr = GetStandbyFlushRecPtr(&currTLI);
426 else
427 logptr = GetFlushRecPtr(&currTLI);
428
429 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
430
432 {
434
435 /* syscache access needs a transaction env. */
438 /* copy dbname out of TX context */
441 }
442
444
445 /* need a tuple descriptor representing four columns */
446 tupdesc = CreateTemplateTupleDesc(4);
447 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
448 TEXTOID, -1, 0);
449 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
450 INT8OID, -1, 0);
451 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
452 TEXTOID, -1, 0);
453 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
454 TEXTOID, -1, 0);
455
456 /* prepare for projection of tuples */
457 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
458
459 /* column 1: system identifier */
460 values[0] = CStringGetTextDatum(sysid);
461
462 /* column 2: timeline */
463 values[1] = Int64GetDatum(currTLI);
464
465 /* column 3: wal location */
466 values[2] = CStringGetTextDatum(xloc);
467
468 /* column 4: database name, or NULL if none */
469 if (dbname)
471 else
472 nulls[3] = true;
473
474 /* send it to dest */
475 do_tup_output(tstate, values, nulls);
476
477 end_tup_output(tstate);
478}
#define UINT64_FORMAT
Definition: c.h:560
struct cursor * cur
Definition: ecpg.c:29
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1259
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746
static Datum Int64GetDatum(int64 X)
Definition: postgres.h:403
char * dbname
Definition: streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3631
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4609
bool RecoveryInProgress(void)
Definition: xlog.c:6406
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6571

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
330 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
333 LWLockRelease(ProcArrayLock);
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:996
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:194
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:3015
static LagTracker * lag_tracker
Definition: walsender.c:252

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3015 of file walsender.c.

3016{
3017 int i;
3018
3019 /*
3020 * WalSndCtl should be set up already (we inherit this by fork() or
3021 * EXEC_BACKEND mechanism from the postmaster).
3022 */
3023 Assert(WalSndCtl != NULL);
3024 Assert(MyWalSnd == NULL);
3025
3026 /*
3027 * Find a free walsender slot and reserve it. This must not fail due to
3028 * the prior check for free WAL senders in InitProcess().
3029 */
3030 for (i = 0; i < max_wal_senders; i++)
3031 {
3032 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3033
3034 SpinLockAcquire(&walsnd->mutex);
3035
3036 if (walsnd->pid != 0)
3037 {
3038 SpinLockRelease(&walsnd->mutex);
3039 continue;
3040 }
3041 else
3042 {
3043 /*
3044 * Found a free slot. Reserve it for us.
3045 */
3046 walsnd->pid = MyProcPid;
3047 walsnd->state = WALSNDSTATE_STARTUP;
3048 walsnd->sentPtr = InvalidXLogRecPtr;
3049 walsnd->needreload = false;
3050 walsnd->write = InvalidXLogRecPtr;
3051 walsnd->flush = InvalidXLogRecPtr;
3052 walsnd->apply = InvalidXLogRecPtr;
3053 walsnd->writeLag = -1;
3054 walsnd->flushLag = -1;
3055 walsnd->applyLag = -1;
3056 walsnd->sync_standby_priority = 0;
3057 walsnd->replyTime = 0;
3058
3059 /*
3060 * The kind assignment is done here and not in StartReplication()
3061 * and StartLogicalReplication(). Indeed, the logical walsender
3062 * needs to read WAL records (like snapshot of running
3063 * transactions) during the slot creation. So it needs to be woken
3064 * up based on its kind.
3065 *
3066 * The kind assignment could also be done in StartReplication(),
3067 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3068 * seems better to set it on one place.
3069 */
3070 if (MyDatabaseId == InvalidOid)
3072 else
3074
3075 SpinLockRelease(&walsnd->mutex);
3076 /* don't need the lock anymore */
3077 MyWalSnd = walsnd;
3078
3079 break;
3080 }
3081 }
3082
3083 Assert(MyWalSnd != NULL);
3084
3085 /* Arrange to clean up at walsender exit */
3087}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:129
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:3091
WalSndCtlData * WalSndCtl
Definition: walsender.c:117
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4280 of file walsender.c.

4281{
4282 TimestampTz time = 0;
4283
4284 /*
4285 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4286 * return the elapsed time (in microseconds) since the saved local flush
4287 * time. If the flush time is in the future (due to clock drift), return
4288 * -1 to treat as no valid sample.
4289 *
4290 * Otherwise, switch back to using the buffer to control the read head and
4291 * compute the elapsed time. The read head is then reset to point to the
4292 * oldest entry in the buffer.
4293 */
4294 if (lag_tracker->read_heads[head] == -1)
4295 {
4296 if (lag_tracker->overflowed[head].lsn > lsn)
4297 return (now >= lag_tracker->overflowed[head].time) ?
4298 now - lag_tracker->overflowed[head].time : -1;
4299
4300 time = lag_tracker->overflowed[head].time;
4302 lag_tracker->read_heads[head] =
4304 }
4305
4306 /* Read all unread samples up to this LSN or end of buffer. */
4307 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4309 {
4311 lag_tracker->last_read[head] =
4313 lag_tracker->read_heads[head] =
4315 }
4316
4317 /*
4318 * If the lag tracker is empty, that means the standby has processed
4319 * everything we've ever sent so we should now clear 'last_read'. If we
4320 * didn't do that, we'd risk using a stale and irrelevant sample for
4321 * interpolation at the beginning of the next burst of WAL after a period
4322 * of idleness.
4323 */
4325 lag_tracker->last_read[head].time = 0;
4326
4327 if (time > now)
4328 {
4329 /* If the clock somehow went backwards, treat as not found. */
4330 return -1;
4331 }
4332 else if (time == 0)
4333 {
4334 /*
4335 * We didn't cross a time. If there is a future sample that we
4336 * haven't reached yet, and we've already reached at least one sample,
4337 * let's interpolate the local flushed time. This is mainly useful
4338 * for reporting a completely stuck apply position as having
4339 * increasing lag, since otherwise we'd have to wait for it to
4340 * eventually start moving again and cross one of our samples before
4341 * we can show the lag increasing.
4342 */
4344 {
4345 /* There are no future samples, so we can't interpolate. */
4346 return -1;
4347 }
4348 else if (lag_tracker->last_read[head].time != 0)
4349 {
4350 /* We can interpolate between last_read and the next sample. */
4351 double fraction;
4352 WalTimeSample prev = lag_tracker->last_read[head];
4354
4355 if (lsn < prev.lsn)
4356 {
4357 /*
4358 * Reported LSNs shouldn't normally go backwards, but it's
4359 * possible when there is a timeline change. Treat as not
4360 * found.
4361 */
4362 return -1;
4363 }
4364
4365 Assert(prev.lsn < next.lsn);
4366
4367 if (prev.time > next.time)
4368 {
4369 /* If the clock somehow went backwards, treat as not found. */
4370 return -1;
4371 }
4372
4373 /* See how far we are between the previous and next samples. */
4374 fraction =
4375 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4376
4377 /* Scale the local flush time proportionally. */
4378 time = (TimestampTz)
4379 ((double) prev.time + (next.time - prev.time) * fraction);
4380 }
4381 else
4382 {
4383 /*
4384 * We have only a future sample, implying that we were entirely
4385 * caught up but and now there is a new burst of WAL and the
4386 * standby hasn't processed the first sample yet. Until the
4387 * standby reaches the future sample the best we can do is report
4388 * the hypothetical lag if that sample were to be replayed now.
4389 */
4391 }
4392 }
4393
4394 /* Return the elapsed time since local flush time in microseconds. */
4395 Assert(time != 0);
4396 return now - time;
4397}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
static int32 next
Definition: blutils.c:224
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:232
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:234
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:235
int write_head
Definition: walsender.c:233
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:249
TimestampTz time
Definition: walsender.c:222
XLogRecPtr lsn
Definition: walsender.c:221
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:226

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4222 of file walsender.c.

4223{
4224 int new_write_head;
4225 int i;
4226
4227 if (!am_walsender)
4228 return;
4229
4230 /*
4231 * If the lsn hasn't advanced since last time, then do nothing. This way
4232 * we only record a new sample when new WAL has been written.
4233 */
4234 if (lag_tracker->last_lsn == lsn)
4235 return;
4236 lag_tracker->last_lsn = lsn;
4237
4238 /*
4239 * If advancing the write head of the circular buffer would crash into any
4240 * of the read heads, then the buffer is full. In other words, the
4241 * slowest reader (presumably apply) is the one that controls the release
4242 * of space.
4243 */
4244 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4245 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4246 {
4247 /*
4248 * If the buffer is full, move the slowest reader to a separate
4249 * overflow entry and free its space in the buffer so the write head
4250 * can advance.
4251 */
4252 if (new_write_head == lag_tracker->read_heads[i])
4253 {
4256 lag_tracker->read_heads[i] = -1;
4257 }
4258 }
4259
4260 /* Store a sample at the current write head position. */
4262 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4263 lag_tracker->write_head = new_write_head;
4264}
XLogRecPtr last_lsn
Definition: walsender.c:231
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1045 of file walsender.c.

1047{
1048 XLogRecPtr flushptr;
1049 int count;
1050 WALReadError errinfo;
1051 XLogSegNo segno;
1052 TimeLineID currTLI;
1053
1054 /*
1055 * Make sure we have enough WAL available before retrieving the current
1056 * timeline.
1057 */
1058 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1059
1060 /* Fail if not enough (implies we are going to shut down) */
1061 if (flushptr < targetPagePtr + reqLen)
1062 return -1;
1063
1064 /*
1065 * Since logical decoding is also permitted on a standby server, we need
1066 * to check if the server is in recovery to decide how to get the current
1067 * timeline ID (so that it also covers the promotion or timeline change
1068 * cases). We must determine am_cascading_walsender after waiting for the
1069 * required WAL so that it is correct when the walsender wakes up after a
1070 * promotion.
1071 */
1073
1075 GetXLogReplayRecPtr(&currTLI);
1076 else
1077 currTLI = GetWALInsertionTimeLine();
1078
1079 XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1080 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1081 sendTimeLine = state->currTLI;
1082 sendTimeLineValidUpto = state->currTLIValidUntil;
1083 sendTimeLineNextTLI = state->nextTLI;
1084
1085 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1086 count = XLOG_BLCKSZ; /* more than one block available */
1087 else
1088 count = flushptr - targetPagePtr; /* part of the page available */
1089
1090 /* now actually read the data, we know it's there */
1091 if (!WALRead(state,
1092 cur_page,
1093 targetPagePtr,
1094 count,
1095 currTLI, /* Pass the current TLI because only
1096 * WalSndSegmentOpen controls whether new TLI
1097 * is needed. */
1098 &errinfo))
1099 WALReadRaiseError(&errinfo);
1100
1101 /*
1102 * After reading into the buffer, check that what we read was valid. We do
1103 * this after reading, because even though the segment was present when we
1104 * opened it, it might get recycled or removed while we read it. The
1105 * read() succeeds in that case, but the data we tried to read might
1106 * already have been overwritten with new WAL records.
1107 */
1108 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1109 CheckXLogRemoved(segno, state->seg.ws_tli);
1110
1111 return count;
1112}
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:164
static bool sendTimeLineIsHistoric
Definition: walsender.c:166
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1817
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:165
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:167
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6592
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3746
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1514
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1757 of file walsender.c.

1758{
1759 int elevel = got_STOPPING ? ERROR : WARNING;
1760 bool failover_slot;
1761
1762 failover_slot = (replication_active && MyReplicationSlot->data.failover);
1763
1764 /*
1765 * Note that after receiving the shutdown signal, an ERROR is reported if
1766 * any slots are dropped, invalidated, or inactive. This measure is taken
1767 * to prevent the walsender from waiting indefinitely.
1768 */
1769 if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1770 {
1771 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1772 return true;
1773 }
1774
1775 *wait_event = 0;
1776 return false;
1777}
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2902

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1789 of file walsender.c.

1791{
1792 /* Check if we need to wait for WALs to be flushed to disk */
1793 if (target_lsn > flushed_lsn)
1794 {
1795 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1796 return true;
1797 }
1798
1799 /* Check if the standby slots have caught up to the flushed position */
1800 return NeedToWaitForStandbys(flushed_lsn, wait_event);
1801}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1757

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3974 of file walsender.c.

3975{
3976 Interval *result = palloc(sizeof(Interval));
3977
3978 result->month = 0;
3979 result->day = 0;
3980 result->time = offset;
3981
3982 return result;
3983}
void * palloc(Size size)
Definition: mcxt.c:1365
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool *  reserve_wal,
CRSSnapshotAction snapshot_action,
bool *  two_phase,
bool *  failover 
)
static

Definition at line 1118 of file walsender.c.

1122{
1123 ListCell *lc;
1124 bool snapshot_action_given = false;
1125 bool reserve_wal_given = false;
1126 bool two_phase_given = false;
1127 bool failover_given = false;
1128
1129 /* Parse options */
1130 foreach(lc, cmd->options)
1131 {
1132 DefElem *defel = (DefElem *) lfirst(lc);
1133
1134 if (strcmp(defel->defname, "snapshot") == 0)
1135 {
1136 char *action;
1137
1138 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1139 ereport(ERROR,
1140 (errcode(ERRCODE_SYNTAX_ERROR),
1141 errmsg("conflicting or redundant options")));
1142
1143 action = defGetString(defel);
1144 snapshot_action_given = true;
1145
1146 if (strcmp(action, "export") == 0)
1147 *snapshot_action = CRS_EXPORT_SNAPSHOT;
1148 else if (strcmp(action, "nothing") == 0)
1149 *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1150 else if (strcmp(action, "use") == 0)
1151 *snapshot_action = CRS_USE_SNAPSHOT;
1152 else
1153 ereport(ERROR,
1154 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1155 errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1156 defel->defname, action)));
1157 }
1158 else if (strcmp(defel->defname, "reserve_wal") == 0)
1159 {
1160 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1161 ereport(ERROR,
1162 (errcode(ERRCODE_SYNTAX_ERROR),
1163 errmsg("conflicting or redundant options")));
1164
1165 reserve_wal_given = true;
1166 *reserve_wal = defGetBoolean(defel);
1167 }
1168 else if (strcmp(defel->defname, "two_phase") == 0)
1169 {
1170 if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1171 ereport(ERROR,
1172 (errcode(ERRCODE_SYNTAX_ERROR),
1173 errmsg("conflicting or redundant options")));
1174 two_phase_given = true;
1175 *two_phase = defGetBoolean(defel);
1176 }
1177 else if (strcmp(defel->defname, "failover") == 0)
1178 {
1179 if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1180 ereport(ERROR,
1181 (errcode(ERRCODE_SYNTAX_ERROR),
1182 errmsg("conflicting or redundant options")));
1183 failover_given = true;
1184 *failover = defGetBoolean(defel);
1185 }
1186 else
1187 elog(ERROR, "unrecognized option: %s", defel->defname);
1188 }
1189}
char * defGetString(DefElem *def)
Definition: define.c:35
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:843
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, failover, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3990 of file walsender.c.

3991{
3992#define PG_STAT_GET_WAL_SENDERS_COLS 12
3993 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3994 SyncRepStandbyData *sync_standbys;
3995 int num_standbys;
3996 int i;
3997
3998 InitMaterializedSRF(fcinfo, 0);
3999
4000 /*
4001 * Get the currently active synchronous standbys. This could be out of
4002 * date before we're done, but we'll use the data anyway.
4003 */
4004 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
4005
4006 for (i = 0; i < max_wal_senders; i++)
4007 {
4008 WalSnd *walsnd = &WalSndCtl->walsnds[i];
4009 XLogRecPtr sent_ptr;
4011 XLogRecPtr flush;
4012 XLogRecPtr apply;
4013 TimeOffset writeLag;
4014 TimeOffset flushLag;
4015 TimeOffset applyLag;
4016 int priority;
4017 int pid;
4019 TimestampTz replyTime;
4020 bool is_sync_standby;
4022 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4023 int j;
4024
4025 /* Collect data from shared memory */
4026 SpinLockAcquire(&walsnd->mutex);
4027 if (walsnd->pid == 0)
4028 {
4029 SpinLockRelease(&walsnd->mutex);
4030 continue;
4031 }
4032 pid = walsnd->pid;
4033 sent_ptr = walsnd->sentPtr;
4034 state = walsnd->state;
4035 write = walsnd->write;
4036 flush = walsnd->flush;
4037 apply = walsnd->apply;
4038 writeLag = walsnd->writeLag;
4039 flushLag = walsnd->flushLag;
4040 applyLag = walsnd->applyLag;
4041 priority = walsnd->sync_standby_priority;
4042 replyTime = walsnd->replyTime;
4043 SpinLockRelease(&walsnd->mutex);
4044
4045 /*
4046 * Detect whether walsender is/was considered synchronous. We can
4047 * provide some protection against stale data by checking the PID
4048 * along with walsnd_index.
4049 */
4050 is_sync_standby = false;
4051 for (j = 0; j < num_standbys; j++)
4052 {
4053 if (sync_standbys[j].walsnd_index == i &&
4054 sync_standbys[j].pid == pid)
4055 {
4056 is_sync_standby = true;
4057 break;
4058 }
4059 }
4060
4061 values[0] = Int32GetDatum(pid);
4062
4063 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
4064 {
4065 /*
4066 * Only superusers and roles with privileges of pg_read_all_stats
4067 * can see details. Other users only get the pid value to know
4068 * it's a walsender, but no details.
4069 */
4070 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4071 }
4072 else
4073 {
4075
4076 if (!XLogRecPtrIsValid(sent_ptr))
4077 nulls[2] = true;
4078 values[2] = LSNGetDatum(sent_ptr);
4079
4081 nulls[3] = true;
4082 values[3] = LSNGetDatum(write);
4083
4084 if (!XLogRecPtrIsValid(flush))
4085 nulls[4] = true;
4086 values[4] = LSNGetDatum(flush);
4087
4088 if (!XLogRecPtrIsValid(apply))
4089 nulls[5] = true;
4090 values[5] = LSNGetDatum(apply);
4091
4092 /*
4093 * Treat a standby such as a pg_basebackup background process
4094 * which always returns an invalid flush location, as an
4095 * asynchronous standby.
4096 */
4097 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4098
4099 if (writeLag < 0)
4100 nulls[6] = true;
4101 else
4103
4104 if (flushLag < 0)
4105 nulls[7] = true;
4106 else
4108
4109 if (applyLag < 0)
4110 nulls[8] = true;
4111 else
4113
4114 values[9] = Int32GetDatum(priority);
4115
4116 /*
4117 * More easily understood version of standby state. This is purely
4118 * informational.
4119 *
4120 * In quorum-based sync replication, the role of each standby
4121 * listed in synchronous_standby_names can be changing very
4122 * frequently. Any standbys considered as "sync" at one moment can
4123 * be switched to "potential" ones at the next moment. So, it's
4124 * basically useless to report "sync" or "potential" as their sync
4125 * states. We report just "quorum" for them.
4126 */
4127 if (priority == 0)
4128 values[10] = CStringGetTextDatum("async");
4129 else if (is_sync_standby)
4131 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4132 else
4133 values[10] = CStringGetTextDatum("potential");
4134
4135 if (replyTime == 0)
4136 nulls[11] = true;
4137 else
4138 values[11] = TimestampTzGetDatum(replyTime);
4139 }
4140
4141 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4142 values, nulls);
4143 }
4144
4145 return (Datum) 0;
4146}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5284
#define MemSet(start, val, len)
Definition: c.h:1022
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:78
Oid GetUserId(void)
Definition: miscinit.c:469
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:754
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3974
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3955
WalSndState
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2395 of file walsender.c.

2396{
2397 bool changed = false;
2399
2401 SpinLockAcquire(&slot->mutex);
2402 if (slot->data.restart_lsn != lsn)
2403 {
2404 changed = true;
2405 slot->data.restart_lsn = lsn;
2406 }
2407 SpinLockRelease(&slot->mutex);
2408
2409 if (changed)
2410 {
2414 }
2415
2416 /*
2417 * One could argue that the slot should be saved to disk now, but that'd
2418 * be energy wasted - the worst thing lost information could cause here is
2419 * to give wrong information in a statistics view - we'll just potentially
2420 * be more conservative in removing files.
2421 */
2422}
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1234
slock_t mutex
Definition: slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1732

References Assert(), ReplicationSlot::data, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, SpinLockRelease, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2533 of file walsender.c.

2534{
2535 bool changed = false;
2537
2538 SpinLockAcquire(&slot->mutex);
2540
2541 /*
2542 * For physical replication we don't need the interlock provided by xmin
2543 * and effective_xmin since the consequences of a missed increase are
2544 * limited to query cancellations, so set both at once.
2545 */
2546 if (!TransactionIdIsNormal(slot->data.xmin) ||
2547 !TransactionIdIsNormal(feedbackXmin) ||
2548 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2549 {
2550 changed = true;
2551 slot->data.xmin = feedbackXmin;
2552 slot->effective_xmin = feedbackXmin;
2553 }
2555 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2556 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2557 {
2558 changed = true;
2559 slot->data.catalog_xmin = feedbackCatalogXmin;
2560 slot->effective_catalog_xmin = feedbackCatalogXmin;
2561 }
2562 SpinLockRelease(&slot->mutex);
2563
2564 if (changed)
2565 {
2568 }
2569}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1178
TransactionId xmin
Definition: slot.h:114
TransactionId catalog_xmin
Definition: slot.h:122
TransactionId effective_catalog_xmin
Definition: slot.h:207
TransactionId effective_xmin
Definition: slot.h:206
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1732 of file walsender.c.

1733{
1735
1736 /*
1737 * If we are running in a standby, there is no need to wake up walsenders.
1738 * This is because we do not support syncing slots to cascading standbys,
1739 * so, there are no walsenders waiting for standbys to catch up.
1740 */
1741 if (RecoveryInProgress())
1742 return;
1743
1746}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2869
#define SlotIsPhysical(slot)
Definition: slot.h:284
ConditionVariable wal_confirm_rcv_cv

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1613 of file walsender.c.

1614{
1615 for (;;)
1616 {
1617 long sleeptime;
1618
1619 /* Check for input from the client */
1621
1622 /* die if timeout was reached */
1624
1625 /* Send keepalive if the time has come */
1627
1628 if (!pq_is_send_pending())
1629 break;
1630
1632
1633 /* Sleep until something happens or we time out */
1635 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1636
1637 /* Clear any already-pending wakeups */
1639
1641
1642 /* Process any requests or signals received recently */
1644 {
1645 ConfigReloadPending = false;
1648 }
1649
1650 /* Try to flush pending output to the client */
1651 if (pq_flush_if_writable() != 0)
1653 }
1654
1655 /* reactivate latch so WalSndLoop knows to continue */
1657}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:445
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_SOCKET_WRITEABLE
Definition: waiteventset.h:36
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3814
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2846
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2244
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4184
static pg_noreturn void WalSndShutdown(void)
Definition: walsender.c:384
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2802

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2244 of file walsender.c.

2245{
2246 unsigned char firstchar;
2247 int maxmsglen;
2248 int r;
2249 bool received = false;
2250
2252
2253 /*
2254 * If we already received a CopyDone from the frontend, any subsequent
2255 * message is the beginning of a new command, and should be processed in
2256 * the main processing loop.
2257 */
2258 while (!streamingDoneReceiving)
2259 {
2261 r = pq_getbyte_if_available(&firstchar);
2262 if (r < 0)
2263 {
2264 /* unexpected error or EOF */
2266 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2267 errmsg("unexpected EOF on standby connection")));
2268 proc_exit(0);
2269 }
2270 if (r == 0)
2271 {
2272 /* no data available without blocking */
2273 pq_endmsgread();
2274 break;
2275 }
2276
2277 /* Validate message type and set packet size limit */
2278 switch (firstchar)
2279 {
2280 case PqMsg_CopyData:
2281 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2282 break;
2283 case PqMsg_CopyDone:
2284 case PqMsg_Terminate:
2285 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2286 break;
2287 default:
2288 ereport(FATAL,
2289 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2290 errmsg("invalid standby message type \"%c\"",
2291 firstchar)));
2292 maxmsglen = 0; /* keep compiler quiet */
2293 break;
2294 }
2295
2296 /* Read the message contents */
2298 if (pq_getmessage(&reply_message, maxmsglen))
2299 {
2301 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2302 errmsg("unexpected EOF on standby connection")));
2303 proc_exit(0);
2304 }
2305
2306 /* ... and process it */
2307 switch (firstchar)
2308 {
2309 /*
2310 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2311 * packet.
2312 */
2313 case PqMsg_CopyData:
2315 received = true;
2316 break;
2317
2318 /*
2319 * PqMsg_CopyDone means the standby requested to finish
2320 * streaming. Reply with CopyDone, if we had not sent that
2321 * already.
2322 */
2323 case PqMsg_CopyDone:
2325 {
2327 streamingDoneSending = true;
2328 }
2329
2331 received = true;
2332 break;
2333
2334 /*
2335 * PqMsg_Terminate means that the standby is closing down the
2336 * socket.
2337 */
2338 case PqMsg_Terminate:
2339 proc_exit(0);
2340
2341 default:
2342 Assert(false); /* NOT REACHED */
2343 }
2344 }
2345
2346 /*
2347 * Save the last reply timestamp if we've received at least one reply.
2348 */
2349 if (received)
2350 {
2353 }
2354}
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1003
void pq_endmsgread(void)
Definition: pqcomm.c:1165
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
static bool waiting_for_ping_response
Definition: walsender.c:190
static TimestampTz last_processing
Definition: walsender.c:181
static bool streamingDoneSending
Definition: walsender.c:198
static void ProcessStandbyMessage(void)
Definition: walsender.c:2360
static bool streamingDoneReceiving
Definition: walsender.c:199

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2613 of file walsender.c.

2614{
2615 TransactionId feedbackXmin;
2616 uint32 feedbackEpoch;
2617 TransactionId feedbackCatalogXmin;
2618 uint32 feedbackCatalogEpoch;
2619 TimestampTz replyTime;
2620
2621 /*
2622 * Decipher the reply message. The caller already consumed the msgtype
2623 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2624 * of this message.
2625 */
2626 replyTime = pq_getmsgint64(&reply_message);
2627 feedbackXmin = pq_getmsgint(&reply_message, 4);
2628 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2629 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2630 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2631
2633 {
2634 char *replyTimeStr;
2635
2636 /* Copy because timestamptz_to_str returns a static buffer */
2637 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2638
2639 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2640 feedbackXmin,
2641 feedbackEpoch,
2642 feedbackCatalogXmin,
2643 feedbackCatalogEpoch,
2644 replyTimeStr);
2645
2646 pfree(replyTimeStr);
2647 }
2648
2649 /*
2650 * Update shared state for this WalSender process based on reply data from
2651 * standby.
2652 */
2653 {
2654 WalSnd *walsnd = MyWalSnd;
2655
2656 SpinLockAcquire(&walsnd->mutex);
2657 walsnd->replyTime = replyTime;
2658 SpinLockRelease(&walsnd->mutex);
2659 }
2660
2661 /*
2662 * Unset WalSender's xmins if the feedback message values are invalid.
2663 * This happens when the downstream turned hot_standby_feedback off.
2664 */
2665 if (!TransactionIdIsNormal(feedbackXmin)
2666 && !TransactionIdIsNormal(feedbackCatalogXmin))
2667 {
2669 if (MyReplicationSlot != NULL)
2670 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2671 return;
2672 }
2673
2674 /*
2675 * Check that the provided xmin/epoch are sane, that is, not in the future
2676 * and not so far back as to be already wrapped around. Ignore if not.
2677 */
2678 if (TransactionIdIsNormal(feedbackXmin) &&
2679 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2680 return;
2681
2682 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2683 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2684 return;
2685
2686 /*
2687 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2688 * the xmin will be taken into account by GetSnapshotData() /
2689 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2690 * thereby prevent the generation of cleanup conflicts on the standby
2691 * server.
2692 *
2693 * There is a small window for a race condition here: although we just
2694 * checked that feedbackXmin precedes nextXid, the nextXid could have
2695 * gotten advanced between our fetching it and applying the xmin below,
2696 * perhaps far enough to make feedbackXmin wrap around. In that case the
2697 * xmin we set here would be "in the future" and have no effect. No point
2698 * in worrying about this since it's too late to save the desired data
2699 * anyway. Assuming that the standby sends us an increasing sequence of
2700 * xmins, this could only happen during the first reply cycle, else our
2701 * own xmin would prevent nextXid from advancing so far.
2702 *
2703 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2704 * is assumed atomic, and there's no real need to prevent concurrent
2705 * horizon determinations. (If we're moving our xmin forward, this is
2706 * obviously safe, and if we're moving it backwards, well, the data is at
2707 * risk already since a VACUUM could already have determined the horizon.)
2708 *
2709 * If we're using a replication slot we reserve the xmin via that,
2710 * otherwise via the walsender's PGPROC entry. We can only track the
2711 * catalog xmin separately when using a slot, so we store the least of the
2712 * two provided when not using a slot.
2713 *
2714 * XXX: It might make sense to generalize the ephemeral slot concept and
2715 * always use the slot mechanism to handle the feedback xmin.
2716 */
2717 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2718 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2719 else
2720 {
2721 if (TransactionIdIsNormal(feedbackCatalogXmin)
2722 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2723 MyProc->xmin = feedbackCatalogXmin;
2724 else
2725 MyProc->xmin = feedbackXmin;
2726 }
2727}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
uint32_t uint32
Definition: c.h:541
uint32 TransactionId
Definition: c.h:660
bool message_level_is_interesting(int elevel)
Definition: elog.c:273
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2533
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2582

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2360 of file walsender.c.

2361{
2362 char msgtype;
2363
2364 /*
2365 * Check message type from the first byte.
2366 */
2367 msgtype = pq_getmsgbyte(&reply_message);
2368
2369 switch (msgtype)
2370 {
2373 break;
2374
2377 break;
2378
2381 break;
2382
2383 default:
2385 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2386 errmsg("unexpected message type \"%c\"", msgtype)));
2387 proc_exit(0);
2388 }
2389}
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
#define PqReplMsg_PrimaryStatusRequest
Definition: protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition: protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition: protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2613
static void ProcessStandbyPSRequestMessage(void)
Definition: walsender.c:2733
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2428

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2733 of file walsender.c.

2734{
2736 TransactionId oldestXidInCommit;
2737 TransactionId oldestGXidInCommit;
2738 FullTransactionId nextFullXid;
2739 FullTransactionId fullOldestXidInCommit;
2740 WalSnd *walsnd = MyWalSnd;
2741 TimestampTz replyTime;
2742
2743 /*
2744 * This shouldn't happen because we don't support getting primary status
2745 * message from standby.
2746 */
2747 if (RecoveryInProgress())
2748 elog(ERROR, "the primary status is unavailable during recovery");
2749
2750 replyTime = pq_getmsgint64(&reply_message);
2751
2752 /*
2753 * Update shared state for this WalSender process based on reply data from
2754 * standby.
2755 */
2756 SpinLockAcquire(&walsnd->mutex);
2757 walsnd->replyTime = replyTime;
2758 SpinLockRelease(&walsnd->mutex);
2759
2760 /*
2761 * Consider transactions in the current database, as only these are the
2762 * ones replicated.
2763 */
2764 oldestXidInCommit = GetOldestActiveTransactionId(true, false);
2765 oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
2766
2767 /*
2768 * Update the oldest xid for standby transmission if an older prepared
2769 * transaction exists and is currently in commit phase.
2770 */
2771 if (TransactionIdIsValid(oldestGXidInCommit) &&
2772 TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
2773 oldestXidInCommit = oldestGXidInCommit;
2774
2775 nextFullXid = ReadNextFullTransactionId();
2776 fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
2777 oldestXidInCommit);
2778 lsn = GetXLogWriteRecPtr();
2779
2780 elog(DEBUG2, "sending primary status");
2781
2782 /* construct the message... */
2786 pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
2789
2790 /* ... and send it wrapped in CopyData */
2792}
int64_t int64
Definition: c.h:538
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition: procarray.c:2833
#define PqReplMsg_PrimaryStatusUpdate
Definition: protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition: transam.h:443
#define U64FromFullTransactionId(x)
Definition: transam.h:49
#define TransactionIdIsValid(xid)
Definition: transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition: twophase.c:2829
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9515

References StringInfoData::data, DEBUG2, elog, ERROR, FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, WalSnd::mutex, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, WalSnd::replyTime, resetStringInfo(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2428 of file walsender.c.

2429{
2430 XLogRecPtr writePtr,
2431 flushPtr,
2432 applyPtr;
2433 bool replyRequested;
2434 TimeOffset writeLag,
2435 flushLag,
2436 applyLag;
2437 bool clearLagTimes;
2439 TimestampTz replyTime;
2440
2441 static bool fullyAppliedLastTime = false;
2442
2443 /* the caller already consumed the msgtype byte */
2444 writePtr = pq_getmsgint64(&reply_message);
2445 flushPtr = pq_getmsgint64(&reply_message);
2446 applyPtr = pq_getmsgint64(&reply_message);
2447 replyTime = pq_getmsgint64(&reply_message);
2448 replyRequested = pq_getmsgbyte(&reply_message);
2449
2451 {
2452 char *replyTimeStr;
2453
2454 /* Copy because timestamptz_to_str returns a static buffer */
2455 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2456
2457 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2458 LSN_FORMAT_ARGS(writePtr),
2459 LSN_FORMAT_ARGS(flushPtr),
2460 LSN_FORMAT_ARGS(applyPtr),
2461 replyRequested ? " (reply requested)" : "",
2462 replyTimeStr);
2463
2464 pfree(replyTimeStr);
2465 }
2466
2467 /* See if we can compute the round-trip lag for these positions. */
2469 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2470 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2471 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2472
2473 /*
2474 * If the standby reports that it has fully replayed the WAL in two
2475 * consecutive reply messages, then the second such message must result
2476 * from wal_receiver_status_interval expiring on the standby. This is a
2477 * convenient time to forget the lag times measured when it last
2478 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2479 * until more WAL traffic arrives.
2480 */
2481 clearLagTimes = false;
2482 if (applyPtr == sentPtr)
2483 {
2484 if (fullyAppliedLastTime)
2485 clearLagTimes = true;
2486 fullyAppliedLastTime = true;
2487 }
2488 else
2489 fullyAppliedLastTime = false;
2490
2491 /* Send a reply if the standby requested one. */
2492 if (replyRequested)
2494
2495 /*
2496 * Update shared state for this WalSender process based on reply data from
2497 * standby.
2498 */
2499 {
2500 WalSnd *walsnd = MyWalSnd;
2501
2502 SpinLockAcquire(&walsnd->mutex);
2503 walsnd->write = writePtr;
2504 walsnd->flush = flushPtr;
2505 walsnd->apply = applyPtr;
2506 if (writeLag != -1 || clearLagTimes)
2507 walsnd->writeLag = writeLag;
2508 if (flushLag != -1 || clearLagTimes)
2509 walsnd->flushLag = flushLag;
2510 if (applyLag != -1 || clearLagTimes)
2511 walsnd->applyLag = applyLag;
2512 walsnd->replyTime = replyTime;
2513 SpinLockRelease(&walsnd->mutex);
2514 }
2515
2518
2519 /*
2520 * Advance our local xmin horizon when the client confirmed a flush.
2521 */
2522 if (MyReplicationSlot && XLogRecPtrIsValid(flushPtr))
2523 {
2526 else
2528 }
2529}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1820
#define SlotIsLogical(slot)
Definition: slot.h:285
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:474
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:173
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2395
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4161
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4280

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 482 of file walsender.c.

483{
484#define READ_REPLICATION_SLOT_COLS 3
485 ReplicationSlot *slot;
487 TupOutputState *tstate;
488 TupleDesc tupdesc;
490 bool nulls[READ_REPLICATION_SLOT_COLS];
491
493 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
494 TEXTOID, -1, 0);
495 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
496 TEXTOID, -1, 0);
497 /* TimeLineID is unsigned, so int4 is not wide enough. */
498 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
499 INT8OID, -1, 0);
500
501 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
502
503 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
504 slot = SearchNamedReplicationSlot(cmd->slotname, false);
505 if (slot == NULL || !slot->in_use)
506 {
507 LWLockRelease(ReplicationSlotControlLock);
508 }
509 else
510 {
511 ReplicationSlot slot_contents;
512 int i = 0;
513
514 /* Copy slot contents while holding spinlock */
515 SpinLockAcquire(&slot->mutex);
516 slot_contents = *slot;
517 SpinLockRelease(&slot->mutex);
518 LWLockRelease(ReplicationSlotControlLock);
519
520 if (OidIsValid(slot_contents.data.database))
522 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
523 errmsg("cannot use %s with a logical replication slot",
524 "READ_REPLICATION_SLOT"));
525
526 /* slot type */
527 values[i] = CStringGetTextDatum("physical");
528 nulls[i] = false;
529 i++;
530
531 /* start LSN */
532 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
533 {
534 char xloc[64];
535
536 snprintf(xloc, sizeof(xloc), "%X/%08X",
537 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
539 nulls[i] = false;
540 }
541 i++;
542
543 /* timeline this WAL was produced on */
544 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
545 {
546 TimeLineID slots_position_timeline;
547 TimeLineID current_timeline;
548 List *timeline_history = NIL;
549
550 /*
551 * While in recovery, use as timeline the currently-replaying one
552 * to get the LSN position's history.
553 */
554 if (RecoveryInProgress())
555 (void) GetXLogReplayRecPtr(&current_timeline);
556 else
557 current_timeline = GetWALInsertionTimeLine();
558
559 timeline_history = readTimeLineHistory(current_timeline);
560 slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
561 timeline_history);
562 values[i] = Int64GetDatum((int64) slots_position_timeline);
563 nulls[i] = false;
564 }
565 i++;
566
568 }
569
571 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
572 do_tup_output(tstate, values, nulls);
573 end_tup_output(tstate);
574}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
#define OidIsValid(objectId)
Definition: c.h:777
@ LW_SHARED
Definition: lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:546
Definition: pg_list.h:54
bool in_use
Definition: slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 581 of file walsender.c.

582{
584 TupleDesc tupdesc;
586 char histfname[MAXFNAMELEN];
587 char path[MAXPGPATH];
588 int fd;
589 off_t histfilelen;
590 off_t bytesleft;
591 Size len;
592
594
595 /*
596 * Reply with a result set with one row, and two columns. The first col is
597 * the name of the history file, 2nd is the contents.
598 */
599 tupdesc = CreateTemplateTupleDesc(2);
600 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
601 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
602
603 TLHistoryFileName(histfname, cmd->timeline);
604 TLHistoryFilePath(path, cmd->timeline);
605
606 /* Send a RowDescription message */
607 dest->rStartup(dest, CMD_SELECT, tupdesc);
608
609 /* Send a DataRow message */
611 pq_sendint16(&buf, 2); /* # of columns */
612 len = strlen(histfname);
613 pq_sendint32(&buf, len); /* col1 len */
614 pq_sendbytes(&buf, histfname, len);
615
616 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
617 if (fd < 0)
620 errmsg("could not open file \"%s\": %m", path)));
621
622 /* Determine file length and send it to client */
623 histfilelen = lseek(fd, 0, SEEK_END);
624 if (histfilelen < 0)
627 errmsg("could not seek to end of file \"%s\": %m", path)));
628 if (lseek(fd, 0, SEEK_SET) != 0)
631 errmsg("could not seek to beginning of file \"%s\": %m", path)));
632
633 pq_sendint32(&buf, histfilelen); /* col2 len */
634
635 bytesleft = histfilelen;
636 while (bytesleft > 0)
637 {
638 PGAlignedBlock rbuf;
639 int nread;
640
641 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
642 nread = read(fd, rbuf.data, sizeof(rbuf));
644 if (nread < 0)
647 errmsg("could not read file \"%s\": %m",
648 path)));
649 else if (nread == 0)
652 errmsg("could not read file \"%s\": read %d of %zu",
653 path, nread, (Size) bytesleft)));
654
655 pq_sendbytes(&buf, rbuf.data, nread);
656 bytesleft -= nread;
657 }
658
659 if (CloseTransientFile(fd) != 0)
662 errmsg("could not close file \"%s\": %m", path)));
663
665}
#define PG_BINARY
Definition: c.h:1261
size_t Size
Definition: c.h:613
int errcode_for_file_access(void)
Definition: elog.c:886
int CloseTransientFile(int fd)
Definition: fd.c:2868
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2691
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:275
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
char data[BLCKSZ]
Definition: c.h:1119
TimeLineID timeline
Definition: replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1451 of file walsender.c.

1452{
1454 QueryCompletion qc;
1455
1456 /* make sure that our requirements are still fulfilled */
1458
1460
1461 ReplicationSlotAcquire(cmd->slotname, true, true);
1462
1463 /*
1464 * Force a disconnect, so that the decoding code doesn't need to care
1465 * about an eventual switch from running in recovery, to running in a
1466 * normal environment. Client code is expected to handle reconnects.
1467 */
1469 {
1470 ereport(LOG,
1471 (errmsg("terminating walsender process after promotion")));
1472 got_STOPPING = true;
1473 }
1474
1475 /*
1476 * Create our decoding context, making it start at the previously ack'ed
1477 * position.
1478 *
1479 * Do this before sending a CopyBothResponse message, so that any errors
1480 * are reported early.
1481 */
1483 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1485 .segment_open = WalSndSegmentOpen,
1486 .segment_close = wal_segment_close),
1490
1492
1493 /* Send a CopyBothResponse message, and start streaming */
1495 pq_sendbyte(&buf, 0);
1496 pq_sendint16(&buf, 0);
1498 pq_flush();
1499
1500 /* Start reading WAL from the oldest required WAL. */
1503
1504 /*
1505 * Report the location after which we'll send out further commits as the
1506 * current sentPtr.
1507 */
1509
1510 /* Also update the sent position status in shared memory */
1514
1515 replication_active = true;
1516
1518
1519 /* Main loop of walsender */
1521
1524
1525 replication_active = false;
1526 if (got_STOPPING)
1527 proc_exit(0);
1529
1530 /* Get out of COPY mode (CommandComplete). */
1531 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1532 EndCommand(&qc, DestRemote, false);
1533}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:498
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:626
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2873
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:216
static void XLogSendLogical(void)
Definition: walsender.c:3495
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:232

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 813 of file walsender.c.

814{
816 XLogRecPtr FlushPtr;
817 TimeLineID FlushTLI;
818
819 /* create xlogreader for physical replication */
820 xlogreader =
822 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
823 .segment_close = wal_segment_close),
824 NULL);
825
826 if (!xlogreader)
828 (errcode(ERRCODE_OUT_OF_MEMORY),
829 errmsg("out of memory"),
830 errdetail("Failed while allocating a WAL reading processor.")));
831
832 /*
833 * We assume here that we're logging enough information in the WAL for
834 * log-shipping, since this is checked in PostmasterMain().
835 *
836 * NOTE: wal_level can only change at shutdown, so in most cases it is
837 * difficult for there to be WAL data that we can still see that was
838 * written at wal_level='minimal'.
839 */
840
841 if (cmd->slotname)
842 {
843 ReplicationSlotAcquire(cmd->slotname, true, true);
846 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
847 errmsg("cannot use a logical replication slot for physical replication")));
848
849 /*
850 * We don't need to verify the slot's restart_lsn here; instead we
851 * rely on the caller requesting the starting point to use. If the
852 * WAL segment doesn't exist, we'll fail later.
853 */
854 }
855
856 /*
857 * Select the timeline. If it was given explicitly by the client, use
858 * that. Otherwise use the timeline of the last replayed record.
859 */
862 FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
863 else
864 FlushPtr = GetFlushRecPtr(&FlushTLI);
865
866 if (cmd->timeline != 0)
867 {
868 XLogRecPtr switchpoint;
869
870 sendTimeLine = cmd->timeline;
871 if (sendTimeLine == FlushTLI)
872 {
875 }
876 else
877 {
878 List *timeLineHistory;
879
881
882 /*
883 * Check that the timeline the client requested exists, and the
884 * requested start location is on that timeline.
885 */
886 timeLineHistory = readTimeLineHistory(FlushTLI);
887 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
889 list_free_deep(timeLineHistory);
890
891 /*
892 * Found the requested timeline in the history. Check that
893 * requested startpoint is on that timeline in our history.
894 *
895 * This is quite loose on purpose. We only check that we didn't
896 * fork off the requested timeline before the switchpoint. We
897 * don't check that we switched *to* it before the requested
898 * starting point. This is because the client can legitimately
899 * request to start replication from the beginning of the WAL
900 * segment that contains switchpoint, but on the new timeline, so
901 * that it doesn't end up with a partial segment. If you ask for
902 * too old a starting point, you'll get an error later when we
903 * fail to find the requested WAL segment in pg_wal.
904 *
905 * XXX: we could be more strict here and only allow a startpoint
906 * that's older than the switchpoint, if it's still in the same
907 * WAL segment.
908 */
909 if (XLogRecPtrIsValid(switchpoint) &&
910 switchpoint < cmd->startpoint)
911 {
913 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
915 cmd->timeline),
916 errdetail("This server's history forked from timeline %u at %X/%08X.",
917 cmd->timeline,
918 LSN_FORMAT_ARGS(switchpoint)));
919 }
920 sendTimeLineValidUpto = switchpoint;
921 }
922 }
923 else
924 {
925 sendTimeLine = FlushTLI;
928 }
929
931
932 /* If there is nothing to stream, don't even enter COPY mode */
934 {
935 /*
936 * When we first start replication the standby will be behind the
937 * primary. For some applications, for example synchronous
938 * replication, it is important to have a clear state for this initial
939 * catchup mode, so we can trigger actions when we change streaming
940 * state later. We may stay in this state for a long time, which is
941 * exactly why we want to be able to monitor whether or not we are
942 * still here.
943 */
945
946 /* Send a CopyBothResponse message, and start streaming */
948 pq_sendbyte(&buf, 0);
949 pq_sendint16(&buf, 0);
951 pq_flush();
952
953 /*
954 * Don't allow a request to stream from a future point in WAL that
955 * hasn't been flushed to disk in this server yet.
956 */
957 if (FlushPtr < cmd->startpoint)
958 {
960 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
962 LSN_FORMAT_ARGS(FlushPtr)));
963 }
964
965 /* Start streaming from the requested point */
966 sentPtr = cmd->startpoint;
967
968 /* Initialize shared memory status, too */
972
974
975 /* Main loop of walsender */
976 replication_active = true;
977
979
980 replication_active = false;
981 if (got_STOPPING)
982 proc_exit(0);
984
986 }
987
988 if (cmd->slotname)
990
991 /*
992 * Copy is finished now. Send a single-row result set indicating the next
993 * timeline.
994 */
996 {
997 char startpos_str[8 + 1 + 8 + 1];
999 TupOutputState *tstate;
1000 TupleDesc tupdesc;
1001 Datum values[2];
1002 bool nulls[2] = {0};
1003
1004 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1006
1008
1009 /*
1010 * Need a tuple descriptor representing two columns. int8 may seem
1011 * like a surprising data type for this, but in theory int4 would not
1012 * be wide enough for this, as TimeLineID is unsigned.
1013 */
1014 tupdesc = CreateTemplateTupleDesc(2);
1015 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1016 INT8OID, -1, 0);
1017 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1018 TEXTOID, -1, 0);
1019
1020 /* prepare for projection of tuple */
1021 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1022
1024 values[1] = CStringGetTextDatum(startpos_str);
1025
1026 /* send it to dest */
1027 do_tup_output(tstate, values, nulls);
1028
1029 end_tup_output(tstate);
1030 }
1031
1032 /* Send CommandComplete message */
1033 EndReplicationCommand("START_STREAMING");
1034}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1216
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3185
int wal_segment_size
Definition: xlog.c:145
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:107

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2582 of file walsender.c.

2583{
2584 FullTransactionId nextFullXid;
2585 TransactionId nextXid;
2586 uint32 nextEpoch;
2587
2588 nextFullXid = ReadNextFullTransactionId();
2589 nextXid = XidFromFullTransactionId(nextFullXid);
2590 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2591
2592 if (xid <= nextXid)
2593 {
2594 if (epoch != nextEpoch)
2595 return false;
2596 }
2597 else
2598 {
2599 if (epoch + 1 != nextEpoch)
2600 return false;
2601 }
2602
2603 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2604 return false; /* epoch OK, but it's wrapped around */
2605
2606 return true;
2607}
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 671 of file walsender.c.

672{
673 MemoryContext mcxt;
675 off_t offset = 0;
677
678 /*
679 * parsing the manifest will use the cryptohash stuff, which requires a
680 * resource owner
681 */
684 CurrentResourceOwner == NULL);
686
687 /* Prepare to read manifest data into a temporary context. */
689 "incremental backup information",
692
693 /* Send a CopyInResponse message */
695 pq_sendbyte(&buf, 0);
696 pq_sendint16(&buf, 0);
698 pq_flush();
699
700 /* Receive packets from client until done. */
701 while (HandleUploadManifestPacket(&buf, &offset, ib))
702 ;
703
704 /* Finish up manifest processing. */
706
707 /*
708 * Discard any old manifest information and arrange to preserve the new
709 * information we just got.
710 *
711 * We assume that MemoryContextDelete and MemoryContextSetParent won't
712 * fail, and thus we shouldn't end up bailing out of here in such a way as
713 * to leave dangling pointers.
714 */
715 if (uploaded_manifest_mcxt != NULL)
720
721 /* clean up the resource owner we created */
723}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:683
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
#define PqMsg_CopyInResponse
Definition: protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition: resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:737
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:156

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2846 of file walsender.c.

2847{
2848 TimestampTz timeout;
2849
2850 /* don't bail out if we're doing something that doesn't require timeouts */
2851 if (last_reply_timestamp <= 0)
2852 return;
2853
2856
2857 if (wal_sender_timeout > 0 && last_processing >= timeout)
2858 {
2859 /*
2860 * Since typically expiration of replication timeout means
2861 * communication problem, we don't send the error message to the
2862 * standby.
2863 */
2865 (errmsg("terminating walsender process due to replication timeout")));
2866
2868 }
2869}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:131

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2802 of file walsender.c.

2803{
2804 long sleeptime = 10000; /* 10 s */
2805
2807 {
2808 TimestampTz wakeup_time;
2809
2810 /*
2811 * At the latest stop sleeping once wal_sender_timeout has been
2812 * reached.
2813 */
2816
2817 /*
2818 * If no ping has been sent yet, wakeup when it's time to do so.
2819 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2820 * the timeout passed without a response.
2821 */
2824 wal_sender_timeout / 2);
2825
2826 /* Compute relative time until wakeup. */
2827 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2828 }
2829
2830 return sleeptime;
2831}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3588 of file walsender.c.

3589{
3590 XLogRecPtr replicatedPtr;
3591
3592 /* ... let's just be real sure we're caught up ... */
3593 send_data();
3594
3595 /*
3596 * To figure out whether all WAL has successfully been replicated, check
3597 * flush location if valid, write otherwise. Tools like pg_receivewal will
3598 * usually (unless in synchronous mode) return an invalid flush location.
3599 */
3600 replicatedPtr = XLogRecPtrIsValid(MyWalSnd->flush) ?
3602
3603 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3605 {
3606 QueryCompletion qc;
3607
3608 /* Inform the standby that XLOG streaming is done */
3609 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3610 EndCommand(&qc, DestRemote, false);
3611 pq_flush();
3612
3613 proc_exit(0);
3614 }
3617}
static bool WalSndCaughtUp
Definition: walsender.c:202

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition: aio.c:1165
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1945
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:853
WALOpenSegment seg
Definition: xlogreader.h:271
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:205
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3955 of file walsender.c.

3956{
3957 switch (state)
3958 {
3960 return "startup";
3961 case WALSNDSTATE_BACKUP:
3962 return "backup";
3964 return "catchup";
3966 return "streaming";
3968 return "stopping";
3969 }
3970 return "UNKNOWN";
3971}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3872 of file walsender.c.

3873{
3874 int i;
3875
3876 for (i = 0; i < max_wal_senders; i++)
3877 {
3878 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3879 pid_t pid;
3880
3881 SpinLockAcquire(&walsnd->mutex);
3882 pid = walsnd->pid;
3883 SpinLockRelease(&walsnd->mutex);
3884
3885 if (pid == 0)
3886 continue;
3887
3889 }
3890}
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4161 of file walsender.c.

4162{
4163 elog(DEBUG2, "sending replication keepalive");
4164
4165 /* construct the message... */
4168 pq_sendint64(&output_message, XLogRecPtrIsValid(writePtr) ? writePtr : sentPtr);
4170 pq_sendbyte(&output_message, requestReply ? 1 : 0);
4171
4172 /* ... and send it wrapped in CopyData */
4174
4175 /* Set local flag */
4176 if (requestReply)
4178}
#define PqReplMsg_Keepalive
Definition: protocol.h:75

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_Keepalive, resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4184 of file walsender.c.

4185{
4186 TimestampTz ping_time;
4187
4188 /*
4189 * Don't send keepalive messages if timeouts are globally disabled or
4190 * we're doing something not partaking in timeouts.
4191 */
4193 return;
4194
4196 return;
4197
4198 /*
4199 * If half of wal_sender_timeout has lapsed without receiving any reply
4200 * from the standby, send a keep-alive message to the standby requesting
4201 * an immediate reply.
4202 */
4204 wal_sender_timeout / 2);
4205 if (last_processing >= ping_time)
4206 {
4208
4209 /* Try to flush pending output to the client */
4210 if (pq_flush_if_writable() != 0)
4212 }
4213}

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3091 of file walsender.c.

3092{
3093 WalSnd *walsnd = MyWalSnd;
3094
3095 Assert(walsnd != NULL);
3096
3097 MyWalSnd = NULL;
3098
3099 SpinLockAcquire(&walsnd->mutex);
3100 /* Mark WalSnd struct as no longer being in use. */
3101 walsnd->pid = 0;
3102 SpinLockRelease(&walsnd->mutex);
3103}

References Assert(), WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3709 of file walsender.c.

3710{
3711 got_SIGUSR2 = true;
3713}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2873 of file walsender.c.

2874{
2875 TimestampTz last_flush = 0;
2876
2877 /*
2878 * Initialize the last reply timestamp. That enables timeout processing
2879 * from hereon.
2880 */
2883
2884 /*
2885 * Loop until we reach the end of this timeline or the client requests to
2886 * stop streaming.
2887 */
2888 for (;;)
2889 {
2890 /* Clear any already-pending wakeups */
2892
2894
2895 /* Process any requests or signals received recently */
2897 {
2898 ConfigReloadPending = false;
2901 }
2902
2903 /* Check for input from the client */
2905
2906 /*
2907 * If we have received CopyDone from the client, sent CopyDone
2908 * ourselves, and the output buffer is empty, it's time to exit
2909 * streaming.
2910 */
2913 break;
2914
2915 /*
2916 * If we don't have any pending data in the output buffer, try to send
2917 * some more. If there is some, we don't bother to call send_data
2918 * again until we've flushed it ... but we'd better assume we are not
2919 * caught up.
2920 */
2921 if (!pq_is_send_pending())
2922 send_data();
2923 else
2924 WalSndCaughtUp = false;
2925
2926 /* Try to flush pending output to the client */
2927 if (pq_flush_if_writable() != 0)
2929
2930 /* If nothing remains to be sent right now ... */
2932 {
2933 /*
2934 * If we're in catchup state, move to streaming. This is an
2935 * important state change for users to know about, since before
2936 * this point data loss might occur if the primary dies and we
2937 * need to failover to the standby. The state change is also
2938 * important for synchronous replication, since commits that
2939 * started to wait at that point might wait for some time.
2940 */
2942 {
2944 (errmsg_internal("\"%s\" has now caught up with upstream server",
2947 }
2948
2949 /*
2950 * When SIGUSR2 arrives, we send any outstanding logs up to the
2951 * shutdown checkpoint record (i.e., the latest record), wait for
2952 * them to be replicated to the standby, and exit. This may be a
2953 * normal termination at shutdown, or a promotion, the walsender
2954 * is not sure which.
2955 */
2956 if (got_SIGUSR2)
2957 WalSndDone(send_data);
2958 }
2959
2960 /* Check for replication timeout. */
2962
2963 /* Send keepalive if the time has come */
2965
2966 /*
2967 * Block if we have unsent data. XXX For logical replication, let
2968 * WalSndWaitForWal() handle any other blocking; idle receivers need
2969 * its additional actions. For physical replication, also block if
2970 * caught up; its send_data does not block.
2971 *
2972 * The IO statistics are reported in WalSndWaitForWal() for the
2973 * logical WAL senders.
2974 */
2975 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2978 {
2979 long sleeptime;
2980 int wakeEvents;
2982
2984 wakeEvents = WL_SOCKET_READABLE;
2985 else
2986 wakeEvents = 0;
2987
2988 /*
2989 * Use fresh timestamp, not last_processing, to reduce the chance
2990 * of reaching wal_sender_timeout before sending a keepalive.
2991 */
2993 sleeptime = WalSndComputeSleeptime(now);
2994
2995 if (pq_is_send_pending())
2996 wakeEvents |= WL_SOCKET_WRITEABLE;
2997
2998 /* Report IO statistics, if needed */
2999 if (TimestampDifferenceExceeds(last_flush, now,
3001 {
3002 pgstat_flush_io(false);
3004 last_flush = now;
3005 }
3006
3007 /* Sleep until something happens or we time out */
3008 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
3009 }
3010 }
3011}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
char * application_name
Definition: guc_tables.c:561
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition: pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition: walsender.c:103
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3588

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1544 of file walsender.c.

1545{
1546 /* can't have sync rep confused by sending the same LSN several times */
1547 if (!last_write)
1548 lsn = InvalidXLogRecPtr;
1549
1550 resetStringInfo(ctx->out);
1551
1553 pq_sendint64(ctx->out, lsn); /* dataStart */
1554 pq_sendint64(ctx->out, lsn); /* walEnd */
1555
1556 /*
1557 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1558 * reserve space here.
1559 */
1560 pq_sendint64(ctx->out, 0); /* sendtime */
1561}
#define PqReplMsg_WALData
Definition: protocol.h:77
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3664 of file walsender.c.

3665{
3666 int i;
3667
3668 for (i = 0; i < max_wal_senders; i++)
3669 {
3670 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3671
3672 SpinLockAcquire(&walsnd->mutex);
3673 if (walsnd->pid == 0)
3674 {
3675 SpinLockRelease(&walsnd->mutex);
3676 continue;
3677 }
3678 walsnd->needreload = true;
3679 SpinLockRelease(&walsnd->mutex);
3680 }
3681}

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3107 of file walsender.c.

3109{
3110 char path[MAXPGPATH];
3111
3112 /*-------
3113 * When reading from a historic timeline, and there is a timeline switch
3114 * within this segment, read from the WAL segment belonging to the new
3115 * timeline.
3116 *
3117 * For example, imagine that this server is currently on timeline 5, and
3118 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3119 * 0/13002088. In pg_wal, we have these files:
3120 *
3121 * ...
3122 * 000000040000000000000012
3123 * 000000040000000000000013
3124 * 000000050000000000000013
3125 * 000000050000000000000014
3126 * ...
3127 *
3128 * In this situation, when requested to send the WAL from segment 0x13, on
3129 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3130 * recovery prefers files from newer timelines, so if the segment was
3131 * restored from the archive on this server, the file belonging to the old
3132 * timeline, 000000040000000000000013, might not exist. Their contents are
3133 * equal up to the switchpoint, because at a timeline switch, the used
3134 * portion of the old segment is copied to the new file.
3135 */
3136 *tli_p = sendTimeLine;
3138 {
3139 XLogSegNo endSegNo;
3140
3141 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3142 if (nextSegNo == endSegNo)
3143 *tli_p = sendTimeLineNextTLI;
3144 }
3145
3146 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3147 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3148 if (state->seg.ws_file >= 0)
3149 return;
3150
3151 /*
3152 * If the file is not found, assume it's because the standby asked for a
3153 * too old WAL segment that has already been removed or recycled.
3154 */
3155 if (errno == ENOENT)
3156 {
3157 char xlogfname[MAXFNAMELEN];
3158 int save_errno = errno;
3159
3160 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3161 errno = save_errno;
3162 ereport(ERROR,
3164 errmsg("requested WAL segment %s has already been removed",
3165 xlogfname)));
3166 }
3167 else
3168 ereport(ERROR,
3170 errmsg("could not open file \"%s\": %m",
3171 path)));
3172}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1086
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3936 of file walsender.c.

3937{
3938 WalSnd *walsnd = MyWalSnd;
3939
3941
3942 if (walsnd->state == state)
3943 return;
3944
3945 SpinLockAcquire(&walsnd->mutex);
3946 walsnd->state = state;
3947 SpinLockRelease(&walsnd->mutex);
3948}

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3748 of file walsender.c.

3749{
3750 bool found;
3751 int i;
3752
3754 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3755
3756 if (!found)
3757 {
3758 /* First time through, so initialize */
3760
3761 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3763
3764 for (i = 0; i < max_wal_senders; i++)
3765 {
3766 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3767
3768 SpinLockInit(&walsnd->mutex);
3769 }
3770
3774 }
3775}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3736

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3736 of file walsender.c.

3737{
3738 Size size = 0;
3739
3740 size = offsetof(WalSndCtlData, walsnds);
3741 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3742
3743 return size;
3744}
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 384 of file walsender.c.

385{
386 /*
387 * Reset whereToSendOutput to prevent ereport from attempting to send any
388 * more messages to the standby.
389 */
392
393 proc_exit(0);
394 abort(); /* keep the compiler quiet */
395}
@ DestNone
Definition: dest.h:87
CommandDest whereToSendOutput
Definition: postgres.c:92

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3717 of file walsender.c.

3718{
3719 /* Set up signal handlers */
3721 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3722 pqsignal(SIGTERM, die); /* request shutdown */
3723 /* SIGQUIT handler was already set up by InitPostmasterChild */
3724 InitializeTimeouts(); /* establishes SIGALRM handler */
3725 pqsignal(SIGPIPE, SIG_IGN);
3727 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3728 * shutdown */
3729
3730 /* Reset some signals that are accepted by postmaster but not here */
3731 pqsignal(SIGCHLD, SIG_DFL);
3732}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
#define pqsignal
Definition: port.h:552
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3062
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3709
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1667 of file walsender.c.

1669{
1670 static TimestampTz sendTime = 0;
1672 bool pending_writes = false;
1673 bool end_xact = ctx->end_xact;
1674
1675 /*
1676 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1677 * avoid flooding the lag tracker when we commit frequently.
1678 *
1679 * We don't have a mechanism to get the ack for any LSN other than end
1680 * xact LSN from the downstream. So, we track lag only for end of
1681 * transaction LSN.
1682 */
1683#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1684 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1686 {
1687 LagTrackerWrite(lsn, now);
1688 sendTime = now;
1689 }
1690
1691 /*
1692 * When skipping empty transactions in synchronous replication, we send a
1693 * keepalive message to avoid delaying such transactions.
1694 *
1695 * It is okay to check sync_standbys_status without lock here as in the
1696 * worst case we will just send an extra keepalive message when it is
1697 * really not required.
1698 */
1699 if (skipped_xact &&
1700 SyncRepRequested() &&
1701 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1702 {
1703 WalSndKeepalive(false, lsn);
1704
1705 /* Try to flush pending output to the client */
1706 if (pq_flush_if_writable() != 0)
1708
1709 /* If we have pending write here, make sure it's actually flushed */
1710 if (pq_is_send_pending())
1711 pending_writes = true;
1712 }
1713
1714 /*
1715 * Process pending writes if any or try to send a keepalive if required.
1716 * We don't need to try sending keep alive messages at the transaction end
1717 * as that will be done at a later point in time. This is required only
1718 * for large transactions where we don't send any changes to the
1719 * downstream and the receiver can timeout due to that.
1720 */
1721 if (pending_writes || (!end_xact &&
1723 wal_sender_timeout / 2)))
1725}
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1613
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4222
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3814 of file walsender.c.

3815{
3816 WaitEvent event;
3817
3818 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3819
3820 /*
3821 * We use a condition variable to efficiently wake up walsenders in
3822 * WalSndWakeup().
3823 *
3824 * Every walsender prepares to sleep on a shared memory CV. Note that it
3825 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3826 * waitlist), but does not actually wait on the CV (IOW, it never calls
3827 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3828 * waiting, because we also need to wait for socket events. The processes
3829 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3830 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3831 * walsenders come out of WaitEventSetWait().
3832 *
3833 * This approach is simple and efficient because, one doesn't have to loop
3834 * through all the walsenders slots, with a spinlock acquisition and
3835 * release for every iteration, just to wake up only the waiting
3836 * walsenders. It makes WalSndWakeup() callers' life easy.
3837 *
3838 * XXX: A desirable future improvement would be to add support for CVs
3839 * into WaitEventSetWait().
3840 *
3841 * And, we use separate shared memory CVs for physical and logical
3842 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3843 *
3844 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3845 * until awakened by physical walsenders after the walreceiver confirms
3846 * the receipt of the LSN.
3847 */
3848 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3854
3855 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3856 (event.events & WL_POSTMASTER_DEATH))
3857 {
3859 proc_exit(1);
3860 }
3861
3863}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: waiteventset.h:62
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: waiteventset.c:656
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1817 of file walsender.c.

1818{
1819 int wakeEvents;
1820 uint32 wait_event = 0;
1821 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1822 TimestampTz last_flush = 0;
1823
1824 /*
1825 * Fast path to avoid acquiring the spinlock in case we already know we
1826 * have enough WAL available and all the standby servers have confirmed
1827 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1828 * if we're far behind.
1829 */
1830 if (XLogRecPtrIsValid(RecentFlushPtr) &&
1831 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1832 return RecentFlushPtr;
1833
1834 /*
1835 * Within the loop, we wait for the necessary WALs to be flushed to disk
1836 * first, followed by waiting for standbys to catch up if there are enough
1837 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1838 */
1839 for (;;)
1840 {
1841 bool wait_for_standby_at_stop = false;
1842 long sleeptime;
1844
1845 /* Clear any already-pending wakeups */
1847
1849
1850 /* Process any requests or signals received recently */
1852 {
1853 ConfigReloadPending = false;
1856 }
1857
1858 /* Check for input from the client */
1860
1861 /*
1862 * If we're shutting down, trigger pending WAL to be written out,
1863 * otherwise we'd possibly end up waiting for WAL that never gets
1864 * written, because walwriter has shut down already.
1865 */
1866 if (got_STOPPING)
1868
1869 /*
1870 * To avoid the scenario where standbys need to catch up to a newer
1871 * WAL location in each iteration, we update our idea of the currently
1872 * flushed position only if we are not waiting for standbys to catch
1873 * up.
1874 */
1875 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1876 {
1877 if (!RecoveryInProgress())
1878 RecentFlushPtr = GetFlushRecPtr(NULL);
1879 else
1880 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1881 }
1882
1883 /*
1884 * If postmaster asked us to stop and the standby slots have caught up
1885 * to the flushed position, don't wait anymore.
1886 *
1887 * It's important to do this check after the recomputation of
1888 * RecentFlushPtr, so we can send all remaining data before shutting
1889 * down.
1890 */
1891 if (got_STOPPING)
1892 {
1893 if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1894 wait_for_standby_at_stop = true;
1895 else
1896 break;
1897 }
1898
1899 /*
1900 * We only send regular messages to the client for full decoded
1901 * transactions, but a synchronous replication and walsender shutdown
1902 * possibly are waiting for a later location. So, before sleeping, we
1903 * send a ping containing the flush location. If the receiver is
1904 * otherwise idle, this keepalive will trigger a reply. Processing the
1905 * reply will update these MyWalSnd locations.
1906 */
1907 if (MyWalSnd->flush < sentPtr &&
1908 MyWalSnd->write < sentPtr &&
1911
1912 /*
1913 * Exit the loop if already caught up and doesn't need to wait for
1914 * standby slots.
1915 */
1916 if (!wait_for_standby_at_stop &&
1917 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1918 break;
1919
1920 /*
1921 * Waiting for new WAL or waiting for standbys to catch up. Since we
1922 * need to wait, we're now caught up.
1923 */
1924 WalSndCaughtUp = true;
1925
1926 /*
1927 * Try to flush any pending output to the client.
1928 */
1929 if (pq_flush_if_writable() != 0)
1931
1932 /*
1933 * If we have received CopyDone from the client, sent CopyDone
1934 * ourselves, and the output buffer is empty, it's time to exit
1935 * streaming, so fail the current WAL fetch request.
1936 */
1939 break;
1940
1941 /* die if timeout was reached */
1943
1944 /* Send keepalive if the time has come */
1946
1947 /*
1948 * Sleep until something happens or we time out. Also wait for the
1949 * socket becoming writable, if there's still pending output.
1950 * Otherwise we might sit on sendable output data while waiting for
1951 * new WAL to be generated. (But if we have nothing to send, we don't
1952 * want to wake on socket-writable.)
1953 */
1955 sleeptime = WalSndComputeSleeptime(now);
1956
1957 wakeEvents = WL_SOCKET_READABLE;
1958
1959 if (pq_is_send_pending())
1960 wakeEvents |= WL_SOCKET_WRITEABLE;
1961
1962 Assert(wait_event != 0);
1963
1964 /* Report IO statistics, if needed */
1965 if (TimestampDifferenceExceeds(last_flush, now,
1967 {
1968 pgstat_flush_io(false);
1970 last_flush = now;
1971 }
1972
1973 WalSndWait(wakeEvents, sleeptime, wait_event);
1974 }
1975
1976 /* reactivate latch so WalSndLoop knows to continue */
1978 return RecentFlushPtr;
1979}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1789
bool XLogBackgroundFlush(void)
Definition: xlog.c:2978

References Assert(), CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGC_SIGHUP, PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3898 of file walsender.c.

3899{
3900 for (;;)
3901 {
3902 int i;
3903 bool all_stopped = true;
3904
3905 for (i = 0; i < max_wal_senders; i++)
3906 {
3907 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3908
3909 SpinLockAcquire(&walsnd->mutex);
3910
3911 if (walsnd->pid == 0)
3912 {
3913 SpinLockRelease(&walsnd->mutex);
3914 continue;
3915 }
3916
3917 if (walsnd->state != WALSNDSTATE_STOPPING)
3918 {
3919 all_stopped = false;
3920 SpinLockRelease(&walsnd->mutex);
3921 break;
3922 }
3923 SpinLockRelease(&walsnd->mutex);
3924 }
3925
3926 /* safe to leave if confirmation is done for all WAL senders */
3927 if (all_stopped)
3928 return;
3929
3930 pg_usleep(10000L); /* wait for 10 msec */
3931 }
3932}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3793 of file walsender.c.

3794{
3795 /*
3796 * Wake up all the walsenders waiting on WAL being flushed or replayed
3797 * respectively. Note that waiting walsender would have prepared to sleep
3798 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3799 * before actually waiting.
3800 */
3801 if (physical)
3803
3804 if (logical)
3806}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1571 of file walsender.c.

1573{
1575
1576 /*
1577 * Fill the send timestamp last, so that it is taken as late as possible.
1578 * This is somewhat ugly, but the protocol is set as it's already used for
1579 * several releases by streaming physical replication.
1580 */
1584 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1585 tmpbuf.data, sizeof(int64));
1586
1587 /* output previously gathered data in a CopyData packet */
1589
1591
1592 /* Try to flush pending output to the client */
1593 if (pq_flush_if_writable() != 0)
1595
1596 /* Try taking fast path unless we get too close to walsender timeout. */
1598 wal_sender_timeout / 2) &&
1600 {
1601 return;
1602 }
1603
1604 /* If we have pending write here, go to slow path */
1606}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3495 of file walsender.c.

3496{
3497 XLogRecord *record;
3498 char *errm;
3499
3500 /*
3501 * We'll use the current flush point to determine whether we've caught up.
3502 * This variable is static in order to cache it across calls. Caching is
3503 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3504 * spinlock.
3505 */
3506 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3507
3508 /*
3509 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3510 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3511 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3512 * didn't wait - i.e. when we're shutting down.
3513 */
3514 WalSndCaughtUp = false;
3515
3516 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3517
3518 /* xlog record was invalid */
3519 if (errm != NULL)
3520 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3521 errm);
3522
3523 if (record != NULL)
3524 {
3525 /*
3526 * Note the lack of any call to LagTrackerWrite() which is handled by
3527 * WalSndUpdateProgress which is called by output plugin through
3528 * logical decoding write api.
3529 */
3531
3533 }
3534
3535 /*
3536 * If first time through in this session, initialize flushPtr. Otherwise,
3537 * we only need to update flushPtr if EndRecPtr is past it.
3538 */
3539 if (!XLogRecPtrIsValid(flushPtr) ||
3541 {
3542 /*
3543 * For cascading logical WAL senders, we use the replay LSN instead of
3544 * the flush LSN, since logical decoding on a standby only processes
3545 * WAL that has been replayed. This distinction becomes particularly
3546 * important during shutdown, as new WAL is no longer replayed and the
3547 * last replayed LSN marks the furthest point up to which decoding can
3548 * proceed.
3549 */
3551 flushPtr = GetXLogReplayRecPtr(NULL);
3552 else
3553 flushPtr = GetFlushRecPtr(NULL);
3554 }
3555
3556 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3557 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3558 WalSndCaughtUp = true;
3559
3560 /*
3561 * If we're caught up and have been requested to stop, have WalSndLoop()
3562 * terminate the connection in an orderly manner, after writing out all
3563 * the pending data.
3564 */
3566 got_SIGUSR2 = true;
3567
3568 /* Update shared memory status */
3569 {
3570 WalSnd *walsnd = MyWalSnd;
3571
3572 SpinLockAcquire(&walsnd->mutex);
3573 walsnd->sentPtr = sentPtr;
3574 SpinLockRelease(&walsnd->mutex);
3575 }
3576}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:390

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3185 of file walsender.c.

3186{
3187 XLogRecPtr SendRqstPtr;
3188 XLogRecPtr startptr;
3189 XLogRecPtr endptr;
3190 Size nbytes;
3191 XLogSegNo segno;
3192 WALReadError errinfo;
3193 Size rbytes;
3194
3195 /* If requested switch the WAL sender to the stopping state. */
3196 if (got_STOPPING)
3198
3200 {
3201 WalSndCaughtUp = true;
3202 return;
3203 }
3204
3205 /* Figure out how far we can safely send the WAL. */
3207 {
3208 /*
3209 * Streaming an old timeline that's in this server's history, but is
3210 * not the one we're currently inserting or replaying. It can be
3211 * streamed up to the point where we switched off that timeline.
3212 */
3213 SendRqstPtr = sendTimeLineValidUpto;
3214 }
3215 else if (am_cascading_walsender)
3216 {
3217 TimeLineID SendRqstTLI;
3218
3219 /*
3220 * Streaming the latest timeline on a standby.
3221 *
3222 * Attempt to send all WAL that has already been replayed, so that we
3223 * know it's valid. If we're receiving WAL through streaming
3224 * replication, it's also OK to send any WAL that has been received
3225 * but not replayed.
3226 *
3227 * The timeline we're recovering from can change, or we can be
3228 * promoted. In either case, the current timeline becomes historic. We
3229 * need to detect that so that we don't try to stream past the point
3230 * where we switched to another timeline. We check for promotion or
3231 * timeline switch after calculating FlushPtr, to avoid a race
3232 * condition: if the timeline becomes historic just after we checked
3233 * that it was still current, it's still be OK to stream it up to the
3234 * FlushPtr that was calculated before it became historic.
3235 */
3236 bool becameHistoric = false;
3237
3238 SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3239
3240 if (!RecoveryInProgress())
3241 {
3242 /* We have been promoted. */
3243 SendRqstTLI = GetWALInsertionTimeLine();
3244 am_cascading_walsender = false;
3245 becameHistoric = true;
3246 }
3247 else
3248 {
3249 /*
3250 * Still a cascading standby. But is the timeline we're sending
3251 * still the one recovery is recovering from?
3252 */
3253 if (sendTimeLine != SendRqstTLI)
3254 becameHistoric = true;
3255 }
3256
3257 if (becameHistoric)
3258 {
3259 /*
3260 * The timeline we were sending has become historic. Read the
3261 * timeline history file of the new timeline to see where exactly
3262 * we forked off from the timeline we were sending.
3263 */
3264 List *history;
3265
3266 history = readTimeLineHistory(SendRqstTLI);
3268
3270 list_free_deep(history);
3271
3273
3274 SendRqstPtr = sendTimeLineValidUpto;
3275 }
3276 }
3277 else
3278 {
3279 /*
3280 * Streaming the current timeline on a primary.
3281 *
3282 * Attempt to send all data that's already been written out and
3283 * fsync'd to disk. We cannot go further than what's been written out
3284 * given the current implementation of WALRead(). And in any case
3285 * it's unsafe to send WAL that is not securely down to disk on the
3286 * primary: if the primary subsequently crashes and restarts, standbys
3287 * must not have applied any WAL that got lost on the primary.
3288 */
3289 SendRqstPtr = GetFlushRecPtr(NULL);
3290 }
3291
3292 /*
3293 * Record the current system time as an approximation of the time at which
3294 * this WAL location was written for the purposes of lag tracking.
3295 *
3296 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3297 * is flushed and we could get that time as well as the LSN when we call
3298 * GetFlushRecPtr() above (and likewise for the cascading standby
3299 * equivalent), but rather than putting any new code into the hot WAL path
3300 * it seems good enough to capture the time here. We should reach this
3301 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3302 * may take some time, we read the WAL flush pointer and take the time
3303 * very close to together here so that we'll get a later position if it is
3304 * still moving.
3305 *
3306 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3307 * this gives us a cheap approximation for the WAL flush time for this
3308 * LSN.
3309 *
3310 * Note that the LSN is not necessarily the LSN for the data contained in
3311 * the present message; it's the end of the WAL, which might be further
3312 * ahead. All the lag tracking machinery cares about is finding out when
3313 * that arbitrary LSN is eventually reported as written, flushed and
3314 * applied, so that it can measure the elapsed time.
3315 */
3316 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3317
3318 /*
3319 * If this is a historic timeline and we've reached the point where we
3320 * forked to the next timeline, stop streaming.
3321 *
3322 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3323 * startup process will normally replay all WAL that has been received
3324 * from the primary, before promoting, but if the WAL streaming is
3325 * terminated at a WAL page boundary, the valid portion of the timeline
3326 * might end in the middle of a WAL record. We might've already sent the
3327 * first half of that partial WAL record to the cascading standby, so that
3328 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3329 * replay the partial WAL record either, so it can still follow our
3330 * timeline switch.
3331 */
3333 {
3334 /* close the current file. */
3335 if (xlogreader->seg.ws_file >= 0)
3337
3338 /* Send CopyDone */
3340 streamingDoneSending = true;
3341
3342 WalSndCaughtUp = true;
3343
3344 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3347 return;
3348 }
3349
3350 /* Do we have any work to do? */
3351 Assert(sentPtr <= SendRqstPtr);
3352 if (SendRqstPtr <= sentPtr)
3353 {
3354 WalSndCaughtUp = true;
3355 return;
3356 }
3357
3358 /*
3359 * Figure out how much to send in one message. If there's no more than
3360 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3361 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3362 *
3363 * The rounding is not only for performance reasons. Walreceiver relies on
3364 * the fact that we never split a WAL record across two messages. Since a
3365 * long WAL record is split at page boundary into continuation records,
3366 * page boundary is always a safe cut-off point. We also assume that
3367 * SendRqstPtr never points to the middle of a WAL record.
3368 */
3369 startptr = sentPtr;
3370 endptr = startptr;
3371 endptr += MAX_SEND_SIZE;
3372
3373 /* if we went beyond SendRqstPtr, back off */
3374 if (SendRqstPtr <= endptr)
3375 {
3376 endptr = SendRqstPtr;
3378 WalSndCaughtUp = false;
3379 else
3380 WalSndCaughtUp = true;
3381 }
3382 else
3383 {
3384 /* round down to page boundary. */
3385 endptr -= (endptr % XLOG_BLCKSZ);
3386 WalSndCaughtUp = false;
3387 }
3388
3389 nbytes = endptr - startptr;
3390 Assert(nbytes <= MAX_SEND_SIZE);
3391
3392 /*
3393 * OK to read and send the slice.
3394 */
3397
3398 pq_sendint64(&output_message, startptr); /* dataStart */
3399 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3400 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3401
3402 /*
3403 * Read the log directly into the output buffer to avoid extra memcpy
3404 * calls.
3405 */
3407
3408retry:
3409 /* attempt to read WAL from WAL buffers first */
3411 startptr, nbytes, xlogreader->seg.ws_tli);
3412 output_message.len += rbytes;
3413 startptr += rbytes;
3414 nbytes -= rbytes;
3415
3416 /* now read the remaining WAL from WAL file */
3417 if (nbytes > 0 &&
3420 startptr,
3421 nbytes,
3422 xlogreader->seg.ws_tli, /* Pass the current TLI because
3423 * only WalSndSegmentOpen controls
3424 * whether new TLI is needed. */
3425 &errinfo))
3426 WALReadRaiseError(&errinfo);
3427
3428 /* See logical_read_xlog_page(). */
3429 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3431
3432 /*
3433 * During recovery, the currently-open WAL file might be replaced with the
3434 * file of the same name retrieved from archive. So we always need to
3435 * check what we read was valid after reading into the buffer. If it's
3436 * invalid, we try to open and read the file again.
3437 */
3439 {
3440 WalSnd *walsnd = MyWalSnd;
3441 bool reload;
3442
3443 SpinLockAcquire(&walsnd->mutex);
3444 reload = walsnd->needreload;
3445 walsnd->needreload = false;
3446 SpinLockRelease(&walsnd->mutex);
3447
3448 if (reload && xlogreader->seg.ws_file >= 0)
3449 {
3451
3452 goto retry;
3453 }
3454 }
3455
3456 output_message.len += nbytes;
3458
3459 /*
3460 * Fill the send timestamp last, so that it is taken as late as possible.
3461 */
3464 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3465 tmpbuf.data, sizeof(int64));
3466
3468
3469 sentPtr = endptr;
3470
3471 /* Update shared memory status */
3472 {
3473 WalSnd *walsnd = MyWalSnd;
3474
3475 SpinLockAcquire(&walsnd->mutex);
3476 walsnd->sentPtr = sentPtr;
3477 SpinLockRelease(&walsnd->mutex);
3478 }
3479
3480 /* Report progress of XLOG streaming in PS display */
3482 {
3483 char activitymsg[50];
3484
3485 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3487 set_ps_display(activitymsg);
3488 }
3489}
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:270
#define MAX_SEND_SIZE
Definition: walsender.c:114
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1754

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 252 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 216 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 199 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 155 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 156 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

bool waiting_for_ping_response = false
static

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader