PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void ParseAlterReplSlotOptions (AlterReplicationSlotCmd *cmd, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 218 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 236 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1439 of file walsender.c.

1440 {
1441  bool failover = false;
1442 
1443  ParseAlterReplSlotOptions(cmd, &failover);
1444  ReplicationSlotAlter(cmd->slotname, failover);
1445 }
void ReplicationSlotAlter(const char *name, bool failover)
Definition: slot.c:807
static void ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
Definition: walsender.c:1414

References ParseAlterReplSlotOptions(), ReplicationSlotAlter(), and AlterReplicationSlotCmd::slotname.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1200 of file walsender.c.

1201 {
1202  const char *snapshot_name = NULL;
1203  char xloc[MAXFNAMELEN];
1204  char *slot_name;
1205  bool reserve_wal = false;
1206  bool two_phase = false;
1207  bool failover = false;
1208  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1209  DestReceiver *dest;
1210  TupOutputState *tstate;
1211  TupleDesc tupdesc;
1212  Datum values[4];
1213  bool nulls[4] = {0};
1214 
1216 
1217  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1218  &failover);
1219 
1220  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1221  {
1222  ReplicationSlotCreate(cmd->slotname, false,
1224  false, false, false);
1225 
1226  if (reserve_wal)
1227  {
1229 
1231 
1232  /* Write this slot to disk if it's a permanent one. */
1233  if (!cmd->temporary)
1235  }
1236  }
1237  else
1238  {
1240  bool need_full_snapshot = false;
1241 
1243 
1245 
1246  /*
1247  * Initially create persistent slot as ephemeral - that allows us to
1248  * nicely handle errors during initialization because it'll get
1249  * dropped if this transaction fails. We'll make it persistent at the
1250  * end. Temporary slots can be created as temporary from beginning as
1251  * they get dropped on error as well.
1252  */
1253  ReplicationSlotCreate(cmd->slotname, true,
1255  two_phase, failover, false);
1256 
1257  /*
1258  * Do options check early so that we can bail before calling the
1259  * DecodingContextFindStartpoint which can take long time.
1260  */
1261  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1262  {
1263  if (IsTransactionBlock())
1264  ereport(ERROR,
1265  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1266  (errmsg("%s must not be called inside a transaction",
1267  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1268 
1269  need_full_snapshot = true;
1270  }
1271  else if (snapshot_action == CRS_USE_SNAPSHOT)
1272  {
1273  if (!IsTransactionBlock())
1274  ereport(ERROR,
1275  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1276  (errmsg("%s must be called inside a transaction",
1277  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1278 
1280  ereport(ERROR,
1281  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1282  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1283  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1284  if (!XactReadOnly)
1285  ereport(ERROR,
1286  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1287  (errmsg("%s must be called in a read-only transaction",
1288  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1289 
1290  if (FirstSnapshotSet)
1291  ereport(ERROR,
1292  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1293  (errmsg("%s must be called before any query",
1294  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1295 
1296  if (IsSubTransaction())
1297  ereport(ERROR,
1298  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1299  (errmsg("%s must not be called in a subtransaction",
1300  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1301 
1302  need_full_snapshot = true;
1303  }
1304 
1305  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1307  XL_ROUTINE(.page_read = logical_read_xlog_page,
1308  .segment_open = WalSndSegmentOpen,
1309  .segment_close = wal_segment_close),
1312 
1313  /*
1314  * Signal that we don't need the timeout mechanism. We're just
1315  * creating the replication slot and don't yet accept feedback
1316  * messages or send keepalives. As we possibly need to wait for
1317  * further WAL the walsender would otherwise possibly be killed too
1318  * soon.
1319  */
1321 
1322  /* build initial snapshot, might take a while */
1324 
1325  /*
1326  * Export or use the snapshot if we've been asked to do so.
1327  *
1328  * NB. We will convert the snapbuild.c kind of snapshot to normal
1329  * snapshot when doing this.
1330  */
1331  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1332  {
1333  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1334  }
1335  else if (snapshot_action == CRS_USE_SNAPSHOT)
1336  {
1337  Snapshot snap;
1338 
1341  }
1342 
1343  /* don't need the decoding context anymore */
1344  FreeDecodingContext(ctx);
1345 
1346  if (!cmd->temporary)
1348  }
1349 
1350  snprintf(xloc, sizeof(xloc), "%X/%X",
1352 
1354 
1355  /*----------
1356  * Need a tuple descriptor representing four columns:
1357  * - first field: the slot name
1358  * - second field: LSN at which we became consistent
1359  * - third field: exported snapshot's name
1360  * - fourth field: output plugin
1361  */
1362  tupdesc = CreateTemplateTupleDesc(4);
1363  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1364  TEXTOID, -1, 0);
1365  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1366  TEXTOID, -1, 0);
1367  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1368  TEXTOID, -1, 0);
1369  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1370  TEXTOID, -1, 0);
1371 
1372  /* prepare for projection of tuples */
1373  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1374 
1375  /* slot_name */
1376  slot_name = NameStr(MyReplicationSlot->data.name);
1377  values[0] = CStringGetTextDatum(slot_name);
1378 
1379  /* consistent wal location */
1380  values[1] = CStringGetTextDatum(xloc);
1381 
1382  /* snapshot name, or NULL if none */
1383  if (snapshot_name != NULL)
1384  values[2] = CStringGetTextDatum(snapshot_name);
1385  else
1386  nulls[2] = true;
1387 
1388  /* plugin, or NULL if none */
1389  if (cmd->plugin != NULL)
1390  values[3] = CStringGetTextDatum(cmd->plugin);
1391  else
1392  nulls[3] = true;
1393 
1394  /* send it to dest */
1395  do_tup_output(tstate, values, nulls);
1396  end_tup_output(tstate);
1397 
1399 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:746
#define Assert(condition)
Definition: c.h:858
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2362
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2420
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2342
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:693
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:649
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:109
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:329
#define NIL
Definition: pg_list.h:68
static bool two_phase
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1010
void ReplicationSlotReserveWal(void)
Definition: slot.c:1401
void ReplicationSlotPersist(void)
Definition: slot.c:1027
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotSave(void)
Definition: slot.c:992
void ReplicationSlotRelease(void)
Definition: slot.c:652
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:668
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:569
bool FirstSnapshotSet
Definition: snapmgr.c:135
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1840
PGPROC * MyProc
Definition: proc.c:66
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:104
ReplicationSlotPersistentData data
Definition: slot.h:178
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:726
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1123
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2988
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1572
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1668
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1051
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1545
static TimestampTz last_reply_timestamp
Definition: walsender.c:179
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:80
int XactIsoLevel
Definition: xact.c:77
bool IsSubTransaction(void)
Definition: xact.c:4988
bool IsTransactionBlock(void)
Definition: xact.c:4915
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1405 of file walsender.c.

1406 {
1407  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1408 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1977 of file walsender.c.

1978 {
1979  int parse_rc;
1980  Node *cmd_node;
1981  const char *cmdtag;
1982  MemoryContext cmd_context;
1983  MemoryContext old_context;
1984 
1985  /*
1986  * If WAL sender has been told that shutdown is getting close, switch its
1987  * status accordingly to handle the next replication commands correctly.
1988  */
1989  if (got_STOPPING)
1991 
1992  /*
1993  * Throw error if in stopping mode. We need prevent commands that could
1994  * generate WAL while the shutdown checkpoint is being written. To be
1995  * safe, we just prohibit all new commands.
1996  */
1998  ereport(ERROR,
1999  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2000  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2001 
2002  /*
2003  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2004  * command arrives. Clean up the old stuff if there's anything.
2005  */
2007 
2009 
2010  /*
2011  * Prepare to parse and execute the command.
2012  */
2014  "Replication command context",
2016  old_context = MemoryContextSwitchTo(cmd_context);
2017 
2018  replication_scanner_init(cmd_string);
2019 
2020  /*
2021  * Is it a WalSender command?
2022  */
2024  {
2025  /* Nope; clean up and get out. */
2027 
2028  MemoryContextSwitchTo(old_context);
2029  MemoryContextDelete(cmd_context);
2030 
2031  /* XXX this is a pretty random place to make this check */
2032  if (MyDatabaseId == InvalidOid)
2033  ereport(ERROR,
2034  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2035  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2036 
2037  /* Tell the caller that this wasn't a WalSender command. */
2038  return false;
2039  }
2040 
2041  /*
2042  * Looks like a WalSender command, so parse it.
2043  */
2044  parse_rc = replication_yyparse();
2045  if (parse_rc != 0)
2046  ereport(ERROR,
2047  (errcode(ERRCODE_SYNTAX_ERROR),
2048  errmsg_internal("replication command parser returned %d",
2049  parse_rc)));
2051 
2052  cmd_node = replication_parse_result;
2053 
2054  /*
2055  * Report query to various monitoring facilities. For this purpose, we
2056  * report replication commands just like SQL commands.
2057  */
2058  debug_query_string = cmd_string;
2059 
2061 
2062  /*
2063  * Log replication command if log_replication_commands is enabled. Even
2064  * when it's disabled, log the command with DEBUG1 level for backward
2065  * compatibility.
2066  */
2068  (errmsg("received replication command: %s", cmd_string)));
2069 
2070  /*
2071  * Disallow replication commands in aborted transaction blocks.
2072  */
2074  ereport(ERROR,
2075  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2076  errmsg("current transaction is aborted, "
2077  "commands ignored until end of transaction block")));
2078 
2080 
2081  /*
2082  * Allocate buffers that will be used for each outgoing and incoming
2083  * message. We do this just once per command to reduce palloc overhead.
2084  */
2088 
2089  switch (cmd_node->type)
2090  {
2091  case T_IdentifySystemCmd:
2092  cmdtag = "IDENTIFY_SYSTEM";
2093  set_ps_display(cmdtag);
2094  IdentifySystem();
2095  EndReplicationCommand(cmdtag);
2096  break;
2097 
2098  case T_ReadReplicationSlotCmd:
2099  cmdtag = "READ_REPLICATION_SLOT";
2100  set_ps_display(cmdtag);
2102  EndReplicationCommand(cmdtag);
2103  break;
2104 
2105  case T_BaseBackupCmd:
2106  cmdtag = "BASE_BACKUP";
2107  set_ps_display(cmdtag);
2108  PreventInTransactionBlock(true, cmdtag);
2110  EndReplicationCommand(cmdtag);
2111  break;
2112 
2113  case T_CreateReplicationSlotCmd:
2114  cmdtag = "CREATE_REPLICATION_SLOT";
2115  set_ps_display(cmdtag);
2117  EndReplicationCommand(cmdtag);
2118  break;
2119 
2120  case T_DropReplicationSlotCmd:
2121  cmdtag = "DROP_REPLICATION_SLOT";
2122  set_ps_display(cmdtag);
2124  EndReplicationCommand(cmdtag);
2125  break;
2126 
2127  case T_AlterReplicationSlotCmd:
2128  cmdtag = "ALTER_REPLICATION_SLOT";
2129  set_ps_display(cmdtag);
2131  EndReplicationCommand(cmdtag);
2132  break;
2133 
2134  case T_StartReplicationCmd:
2135  {
2136  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2137 
2138  cmdtag = "START_REPLICATION";
2139  set_ps_display(cmdtag);
2140  PreventInTransactionBlock(true, cmdtag);
2141 
2142  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2143  StartReplication(cmd);
2144  else
2146 
2147  /* dupe, but necessary per libpqrcv_endstreaming */
2148  EndReplicationCommand(cmdtag);
2149 
2150  Assert(xlogreader != NULL);
2151  break;
2152  }
2153 
2154  case T_TimeLineHistoryCmd:
2155  cmdtag = "TIMELINE_HISTORY";
2156  set_ps_display(cmdtag);
2157  PreventInTransactionBlock(true, cmdtag);
2159  EndReplicationCommand(cmdtag);
2160  break;
2161 
2162  case T_VariableShowStmt:
2163  {
2165  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2166 
2167  cmdtag = "SHOW";
2168  set_ps_display(cmdtag);
2169 
2170  /* syscache access needs a transaction environment */
2172  GetPGVariable(n->name, dest);
2174  EndReplicationCommand(cmdtag);
2175  }
2176  break;
2177 
2178  case T_UploadManifestCmd:
2179  cmdtag = "UPLOAD_MANIFEST";
2180  set_ps_display(cmdtag);
2181  PreventInTransactionBlock(true, cmdtag);
2182  UploadManifest();
2183  EndReplicationCommand(cmdtag);
2184  break;
2185 
2186  default:
2187  elog(ERROR, "unrecognized replication command node tag: %u",
2188  cmd_node->type);
2189  }
2190 
2191  /* done */
2192  MemoryContextSwitchTo(old_context);
2193  MemoryContextDelete(cmd_context);
2194 
2195  /*
2196  * We need not update ps display or pg_stat_activity, because PostgresMain
2197  * will reset those to "idle". But we must reset debug_query_string to
2198  * ensure it doesn't become a dangling pointer.
2199  */
2200  debug_query_string = NULL;
2201 
2202  return true;
2203 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:988
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:857
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
Oid MyDatabaseId
Definition: globals.c:91
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:729
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1439
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:589
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:490
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:407
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3809
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:679
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1200
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1452
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1405
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:819
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3584
void StartTransactionCommand(void)
Definition: xact.c:2995
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:404
void CommitTransactionCommand(void)
Definition: xact.c:3093

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3504 of file walsender.c.

3505 {
3506  XLogRecPtr replayPtr;
3507  TimeLineID replayTLI;
3508  XLogRecPtr receivePtr;
3510  XLogRecPtr result;
3511 
3513 
3514  /*
3515  * We can safely send what's already been replayed. Also, if walreceiver
3516  * is streaming WAL from the same timeline, we can send anything that it
3517  * has streamed, but hasn't been replayed yet.
3518  */
3519 
3520  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3521  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3522 
3523  if (tli)
3524  *tli = replayTLI;
3525 
3526  result = replayPtr;
3527  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3528  result = receivePtr;
3529 
3530  return result;
3531 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1650
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 743 of file walsender.c.

745 {
746  int mtype;
747  int maxmsglen;
748 
750 
751  pq_startmsgread();
752  mtype = pq_getbyte();
753  if (mtype == EOF)
754  ereport(ERROR,
755  (errcode(ERRCODE_CONNECTION_FAILURE),
756  errmsg("unexpected EOF on client connection with an open transaction")));
757 
758  switch (mtype)
759  {
760  case 'd': /* CopyData */
761  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
762  break;
763  case 'c': /* CopyDone */
764  case 'f': /* CopyFail */
765  case 'H': /* Flush */
766  case 'S': /* Sync */
767  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
768  break;
769  default:
770  ereport(ERROR,
771  (errcode(ERRCODE_PROTOCOL_VIOLATION),
772  errmsg("unexpected message type 0x%02X during COPY from stdin",
773  mtype)));
774  maxmsglen = 0; /* keep compiler quiet */
775  break;
776  }
777 
778  /* Now collect the message body */
779  if (pq_getmessage(buf, maxmsglen))
780  ereport(ERROR,
781  (errcode(ERRCODE_CONNECTION_FAILURE),
782  errmsg("unexpected EOF on client connection with an open transaction")));
784 
785  /* Process the message */
786  switch (mtype)
787  {
788  case 'd': /* CopyData */
789  AppendIncrementalManifestData(ib, buf->data, buf->len);
790  return true;
791 
792  case 'c': /* CopyDone */
793  return false;
794 
795  case 'H': /* Sync */
796  case 'S': /* Flush */
797  /* Ignore these while in CopyOut mode as we do elsewhere. */
798  return true;
799 
800  case 'f':
801  ereport(ERROR,
802  (errcode(ERRCODE_QUERY_CANCELED),
803  errmsg("COPY from stdin failed: %s",
804  pq_getmsgstring(buf))));
805  }
806 
807  /* Not reached. */
808  Assert(false);
809  return false;
810 }
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:141
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:143
static char * buf
Definition: pg_test_fsync.c:73
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1202
int pq_getbyte(void)
Definition: pqcomm.c:963
void pq_startmsgread(void)
Definition: pqcomm.c:1140
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3560 of file walsender.c.

3561 {
3563 
3564  /*
3565  * If replication has not yet started, die like with SIGTERM. If
3566  * replication is active, only set a flag and wake up the main loop. It
3567  * will send any outstanding WAL, wait for it to be replicated to the
3568  * standby, and then exit gracefully.
3569  */
3570  if (!replication_active)
3571  kill(MyProcPid, SIGTERM);
3572  else
3573  got_STOPPING = true;
3574 }
int MyProcPid
Definition: globals.c:45
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:485

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 407 of file walsender.c.

408 {
409  char sysid[32];
410  char xloc[MAXFNAMELEN];
411  XLogRecPtr logptr;
412  char *dbname = NULL;
414  TupOutputState *tstate;
415  TupleDesc tupdesc;
416  Datum values[4];
417  bool nulls[4] = {0};
418  TimeLineID currTLI;
419 
420  /*
421  * Reply with a result set with one row, four columns. First col is system
422  * ID, second is timeline ID, third is current xlog location and the
423  * fourth contains the database name if we are connected to one.
424  */
425 
426  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
428 
431  logptr = GetStandbyFlushRecPtr(&currTLI);
432  else
433  logptr = GetFlushRecPtr(&currTLI);
434 
435  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
436 
437  if (MyDatabaseId != InvalidOid)
438  {
440 
441  /* syscache access needs a transaction env. */
443  /* make dbname live outside TX context */
447  /* CommitTransactionCommand switches to TopMemoryContext */
449  }
450 
452 
453  /* need a tuple descriptor representing four columns */
454  tupdesc = CreateTemplateTupleDesc(4);
455  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
456  TEXTOID, -1, 0);
457  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
458  INT8OID, -1, 0);
459  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
460  TEXTOID, -1, 0);
461  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
462  TEXTOID, -1, 0);
463 
464  /* prepare for projection of tuples */
465  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
466 
467  /* column 1: system identifier */
468  values[0] = CStringGetTextDatum(sysid);
469 
470  /* column 2: timeline */
471  values[1] = Int64GetDatum(currTLI);
472 
473  /* column 3: wal location */
474  values[2] = CStringGetTextDatum(xloc);
475 
476  /* column 4: database name, or NULL if none */
477  if (dbname)
479  else
480  nulls[3] = true;
481 
482  /* send it to dest */
483  do_tup_output(tstate, values, nulls);
484 
485  end_tup_output(tstate);
486 }
#define UINT64_FORMAT
Definition: c.h:549
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3154
struct cursor * cur
Definition: ecpg.c:28
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * dbname
Definition: streamutil.c:52
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3504
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4535
bool RecoveryInProgress(void)
Definition: xlog.c:6290
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6455

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2893 of file walsender.c.

2894 {
2895  int i;
2896 
2897  /*
2898  * WalSndCtl should be set up already (we inherit this by fork() or
2899  * EXEC_BACKEND mechanism from the postmaster).
2900  */
2901  Assert(WalSndCtl != NULL);
2902  Assert(MyWalSnd == NULL);
2903 
2904  /*
2905  * Find a free walsender slot and reserve it. This must not fail due to
2906  * the prior check for free WAL senders in InitProcess().
2907  */
2908  for (i = 0; i < max_wal_senders; i++)
2909  {
2910  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2911 
2912  SpinLockAcquire(&walsnd->mutex);
2913 
2914  if (walsnd->pid != 0)
2915  {
2916  SpinLockRelease(&walsnd->mutex);
2917  continue;
2918  }
2919  else
2920  {
2921  /*
2922  * Found a free slot. Reserve it for us.
2923  */
2924  walsnd->pid = MyProcPid;
2925  walsnd->state = WALSNDSTATE_STARTUP;
2926  walsnd->sentPtr = InvalidXLogRecPtr;
2927  walsnd->needreload = false;
2928  walsnd->write = InvalidXLogRecPtr;
2929  walsnd->flush = InvalidXLogRecPtr;
2930  walsnd->apply = InvalidXLogRecPtr;
2931  walsnd->writeLag = -1;
2932  walsnd->flushLag = -1;
2933  walsnd->applyLag = -1;
2934  walsnd->sync_standby_priority = 0;
2935  walsnd->latch = &MyProc->procLatch;
2936  walsnd->replyTime = 0;
2937 
2938  /*
2939  * The kind assignment is done here and not in StartReplication()
2940  * and StartLogicalReplication(). Indeed, the logical walsender
2941  * needs to read WAL records (like snapshot of running
2942  * transactions) during the slot creation. So it needs to be woken
2943  * up based on its kind.
2944  *
2945  * The kind assignment could also be done in StartReplication(),
2946  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2947  * seems better to set it on one place.
2948  */
2949  if (MyDatabaseId == InvalidOid)
2950  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2951  else
2952  walsnd->kind = REPLICATION_KIND_LOGICAL;
2953 
2954  SpinLockRelease(&walsnd->mutex);
2955  /* don't need the lock anymore */
2956  MyWalSnd = (WalSnd *) walsnd;
2957 
2958  break;
2959  }
2960  }
2961 
2962  Assert(MyWalSnd != NULL);
2963 
2964  /* Arrange to clean up at walsender exit */
2966 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch procLatch
Definition: proc.h:165
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:121
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2970
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4160 of file walsender.c.

4161 {
4162  TimestampTz time = 0;
4163 
4164  /* Read all unread samples up to this LSN or end of buffer. */
4165  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4166  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
4167  {
4168  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4169  lag_tracker->last_read[head] =
4171  lag_tracker->read_heads[head] =
4173  }
4174 
4175  /*
4176  * If the lag tracker is empty, that means the standby has processed
4177  * everything we've ever sent so we should now clear 'last_read'. If we
4178  * didn't do that, we'd risk using a stale and irrelevant sample for
4179  * interpolation at the beginning of the next burst of WAL after a period
4180  * of idleness.
4181  */
4183  lag_tracker->last_read[head].time = 0;
4184 
4185  if (time > now)
4186  {
4187  /* If the clock somehow went backwards, treat as not found. */
4188  return -1;
4189  }
4190  else if (time == 0)
4191  {
4192  /*
4193  * We didn't cross a time. If there is a future sample that we
4194  * haven't reached yet, and we've already reached at least one sample,
4195  * let's interpolate the local flushed time. This is mainly useful
4196  * for reporting a completely stuck apply position as having
4197  * increasing lag, since otherwise we'd have to wait for it to
4198  * eventually start moving again and cross one of our samples before
4199  * we can show the lag increasing.
4200  */
4202  {
4203  /* There are no future samples, so we can't interpolate. */
4204  return -1;
4205  }
4206  else if (lag_tracker->last_read[head].time != 0)
4207  {
4208  /* We can interpolate between last_read and the next sample. */
4209  double fraction;
4210  WalTimeSample prev = lag_tracker->last_read[head];
4212 
4213  if (lsn < prev.lsn)
4214  {
4215  /*
4216  * Reported LSNs shouldn't normally go backwards, but it's
4217  * possible when there is a timeline change. Treat as not
4218  * found.
4219  */
4220  return -1;
4221  }
4222 
4223  Assert(prev.lsn < next.lsn);
4224 
4225  if (prev.time > next.time)
4226  {
4227  /* If the clock somehow went backwards, treat as not found. */
4228  return -1;
4229  }
4230 
4231  /* See how far we are between the previous and next samples. */
4232  fraction =
4233  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4234 
4235  /* Scale the local flush time proportionally. */
4236  time = (TimestampTz)
4237  ((double) prev.time + (next.time - prev.time) * fraction);
4238  }
4239  else
4240  {
4241  /*
4242  * We have only a future sample, implying that we were entirely
4243  * caught up but and now there is a new burst of WAL and the
4244  * standby hasn't processed the first sample yet. Until the
4245  * standby reaches the future sample the best we can do is report
4246  * the hypothetical lag if that sample were to be replayed now.
4247  */
4248  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4249  }
4250  }
4251 
4252  /* Return the elapsed time since local flush time in microseconds. */
4253  Assert(time != 0);
4254  return now - time;
4255 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
static int32 next
Definition: blutils.c:221
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:224
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:226
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:227
int write_head
Definition: walsender.c:225
TimestampTz time
Definition: walsender.c:214
XLogRecPtr lsn
Definition: walsender.c:213
static LagTracker * lag_tracker
Definition: walsender.c:230
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:218

References Assert, LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4095 of file walsender.c.

4096 {
4097  bool buffer_full;
4098  int new_write_head;
4099  int i;
4100 
4101  if (!am_walsender)
4102  return;
4103 
4104  /*
4105  * If the lsn hasn't advanced since last time, then do nothing. This way
4106  * we only record a new sample when new WAL has been written.
4107  */
4108  if (lag_tracker->last_lsn == lsn)
4109  return;
4110  lag_tracker->last_lsn = lsn;
4111 
4112  /*
4113  * If advancing the write head of the circular buffer would crash into any
4114  * of the read heads, then the buffer is full. In other words, the
4115  * slowest reader (presumably apply) is the one that controls the release
4116  * of space.
4117  */
4118  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4119  buffer_full = false;
4120  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4121  {
4122  if (new_write_head == lag_tracker->read_heads[i])
4123  buffer_full = true;
4124  }
4125 
4126  /*
4127  * If the buffer is full, for now we just rewind by one slot and overwrite
4128  * the last sample, as a simple (if somewhat uneven) way to lower the
4129  * sampling rate. There may be better adaptive compaction algorithms.
4130  */
4131  if (buffer_full)
4132  {
4133  new_write_head = lag_tracker->write_head;
4134  if (lag_tracker->write_head > 0)
4136  else
4138  }
4139 
4140  /* Store a sample at the current write head position. */
4142  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4143  lag_tracker->write_head = new_write_head;
4144 }
XLogRecPtr last_lsn
Definition: walsender.c:223
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1051 of file walsender.c.

1053 {
1054  XLogRecPtr flushptr;
1055  int count;
1056  WALReadError errinfo;
1057  XLogSegNo segno;
1058  TimeLineID currTLI;
1059 
1060  /*
1061  * Make sure we have enough WAL available before retrieving the current
1062  * timeline. This is needed to determine am_cascading_walsender accurately
1063  * which is needed to determine the current timeline.
1064  */
1065  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1066 
1067  /*
1068  * Since logical decoding is also permitted on a standby server, we need
1069  * to check if the server is in recovery to decide how to get the current
1070  * timeline ID (so that it also cover the promotion or timeline change
1071  * cases).
1072  */
1074 
1076  GetXLogReplayRecPtr(&currTLI);
1077  else
1078  currTLI = GetWALInsertionTimeLine();
1079 
1080  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1081  sendTimeLineIsHistoric = (state->currTLI != currTLI);
1082  sendTimeLine = state->currTLI;
1083  sendTimeLineValidUpto = state->currTLIValidUntil;
1084  sendTimeLineNextTLI = state->nextTLI;
1085 
1086  /* fail if not (implies we are going to shut down) */
1087  if (flushptr < targetPagePtr + reqLen)
1088  return -1;
1089 
1090  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1091  count = XLOG_BLCKSZ; /* more than one block available */
1092  else
1093  count = flushptr - targetPagePtr; /* part of the page available */
1094 
1095  /* now actually read the data, we know it's there */
1096  if (!WALRead(state,
1097  cur_page,
1098  targetPagePtr,
1099  count,
1100  currTLI, /* Pass the current TLI because only
1101  * WalSndSegmentOpen controls whether new TLI
1102  * is needed. */
1103  &errinfo))
1104  WALReadRaiseError(&errinfo);
1105 
1106  /*
1107  * After reading into the buffer, check that what we read was valid. We do
1108  * this after reading, because even though the segment was present when we
1109  * opened it, it might get recycled or removed while we read it. The
1110  * read() succeeds in that case, but the data we tried to read might
1111  * already have been overwritten with new WAL records.
1112  */
1113  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1114  CheckXLogRemoved(segno, state->seg.ws_tli);
1115 
1116  return count;
1117 }
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:156
static bool sendTimeLineIsHistoric
Definition: walsender.c:158
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1818
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:157
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:159
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6476
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3716
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1503
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:718
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1020

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1758 of file walsender.c.

1759 {
1760  int elevel = got_STOPPING ? ERROR : WARNING;
1761  bool failover_slot;
1762 
1763  failover_slot = (replication_active && MyReplicationSlot->data.failover);
1764 
1765  /*
1766  * Note that after receiving the shutdown signal, an ERROR is reported if
1767  * any slots are dropped, invalidated, or inactive. This measure is taken
1768  * to prevent the walsender from waiting indefinitely.
1769  */
1770  if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1771  {
1772  *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1773  return true;
1774  }
1775 
1776  *wait_event = 0;
1777  return false;
1778 }
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2580

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1790 of file walsender.c.

1792 {
1793  /* Check if we need to wait for WALs to be flushed to disk */
1794  if (target_lsn > flushed_lsn)
1795  {
1796  *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1797  return true;
1798  }
1799 
1800  /* Check if the standby slots have caught up to the flushed position */
1801  return NeedToWaitForStandbys(flushed_lsn, wait_event);
1802 }
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1758

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3847 of file walsender.c.

3848 {
3849  Interval *result = palloc(sizeof(Interval));
3850 
3851  result->month = 0;
3852  result->day = 0;
3853  result->time = offset;
3854 
3855  return result;
3856 }
void * palloc(Size size)
Definition: mcxt.c:1316
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ ParseAlterReplSlotOptions()

static void ParseAlterReplSlotOptions ( AlterReplicationSlotCmd cmd,
bool failover 
)
static

Definition at line 1414 of file walsender.c.

1415 {
1416  bool failover_given = false;
1417 
1418  /* Parse options */
1419  foreach_ptr(DefElem, defel, cmd->options)
1420  {
1421  if (strcmp(defel->defname, "failover") == 0)
1422  {
1423  if (failover_given)
1424  ereport(ERROR,
1425  (errcode(ERRCODE_SYNTAX_ERROR),
1426  errmsg("conflicting or redundant options")));
1427  failover_given = true;
1428  *failover = defGetBoolean(defel);
1429  }
1430  else
1431  elog(ERROR, "unrecognized option: %s", defel->defname);
1432  }
1433 }
bool defGetBoolean(DefElem *def)
Definition: define.c:107
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, foreach_ptr, and AlterReplicationSlotCmd::options.

Referenced by AlterReplicationSlot().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1123 of file walsender.c.

1127 {
1128  ListCell *lc;
1129  bool snapshot_action_given = false;
1130  bool reserve_wal_given = false;
1131  bool two_phase_given = false;
1132  bool failover_given = false;
1133 
1134  /* Parse options */
1135  foreach(lc, cmd->options)
1136  {
1137  DefElem *defel = (DefElem *) lfirst(lc);
1138 
1139  if (strcmp(defel->defname, "snapshot") == 0)
1140  {
1141  char *action;
1142 
1143  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1144  ereport(ERROR,
1145  (errcode(ERRCODE_SYNTAX_ERROR),
1146  errmsg("conflicting or redundant options")));
1147 
1148  action = defGetString(defel);
1149  snapshot_action_given = true;
1150 
1151  if (strcmp(action, "export") == 0)
1152  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1153  else if (strcmp(action, "nothing") == 0)
1154  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1155  else if (strcmp(action, "use") == 0)
1156  *snapshot_action = CRS_USE_SNAPSHOT;
1157  else
1158  ereport(ERROR,
1159  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1160  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1161  defel->defname, action)));
1162  }
1163  else if (strcmp(defel->defname, "reserve_wal") == 0)
1164  {
1165  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1166  ereport(ERROR,
1167  (errcode(ERRCODE_SYNTAX_ERROR),
1168  errmsg("conflicting or redundant options")));
1169 
1170  reserve_wal_given = true;
1171  *reserve_wal = defGetBoolean(defel);
1172  }
1173  else if (strcmp(defel->defname, "two_phase") == 0)
1174  {
1175  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1176  ereport(ERROR,
1177  (errcode(ERRCODE_SYNTAX_ERROR),
1178  errmsg("conflicting or redundant options")));
1179  two_phase_given = true;
1180  *two_phase = defGetBoolean(defel);
1181  }
1182  else if (strcmp(defel->defname, "failover") == 0)
1183  {
1184  if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1185  ereport(ERROR,
1186  (errcode(ERRCODE_SYNTAX_ERROR),
1187  errmsg("conflicting or redundant options")));
1188  failover_given = true;
1189  *failover = defGetBoolean(defel);
1190  }
1191  else
1192  elog(ERROR, "unrecognized option: %s", defel->defname);
1193  }
1194 }
char * defGetString(DefElem *def)
Definition: define.c:48
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:815
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3863 of file walsender.c.

3864 {
3865 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3866  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3867  SyncRepStandbyData *sync_standbys;
3868  int num_standbys;
3869  int i;
3870 
3871  InitMaterializedSRF(fcinfo, 0);
3872 
3873  /*
3874  * Get the currently active synchronous standbys. This could be out of
3875  * date before we're done, but we'll use the data anyway.
3876  */
3877  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3878 
3879  for (i = 0; i < max_wal_senders; i++)
3880  {
3881  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3882  XLogRecPtr sent_ptr;
3883  XLogRecPtr write;
3884  XLogRecPtr flush;
3885  XLogRecPtr apply;
3886  TimeOffset writeLag;
3887  TimeOffset flushLag;
3888  TimeOffset applyLag;
3889  int priority;
3890  int pid;
3892  TimestampTz replyTime;
3893  bool is_sync_standby;
3895  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3896  int j;
3897 
3898  /* Collect data from shared memory */
3899  SpinLockAcquire(&walsnd->mutex);
3900  if (walsnd->pid == 0)
3901  {
3902  SpinLockRelease(&walsnd->mutex);
3903  continue;
3904  }
3905  pid = walsnd->pid;
3906  sent_ptr = walsnd->sentPtr;
3907  state = walsnd->state;
3908  write = walsnd->write;
3909  flush = walsnd->flush;
3910  apply = walsnd->apply;
3911  writeLag = walsnd->writeLag;
3912  flushLag = walsnd->flushLag;
3913  applyLag = walsnd->applyLag;
3914  priority = walsnd->sync_standby_priority;
3915  replyTime = walsnd->replyTime;
3916  SpinLockRelease(&walsnd->mutex);
3917 
3918  /*
3919  * Detect whether walsender is/was considered synchronous. We can
3920  * provide some protection against stale data by checking the PID
3921  * along with walsnd_index.
3922  */
3923  is_sync_standby = false;
3924  for (j = 0; j < num_standbys; j++)
3925  {
3926  if (sync_standbys[j].walsnd_index == i &&
3927  sync_standbys[j].pid == pid)
3928  {
3929  is_sync_standby = true;
3930  break;
3931  }
3932  }
3933 
3934  values[0] = Int32GetDatum(pid);
3935 
3936  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3937  {
3938  /*
3939  * Only superusers and roles with privileges of pg_read_all_stats
3940  * can see details. Other users only get the pid value to know
3941  * it's a walsender, but no details.
3942  */
3943  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3944  }
3945  else
3946  {
3948 
3949  if (XLogRecPtrIsInvalid(sent_ptr))
3950  nulls[2] = true;
3951  values[2] = LSNGetDatum(sent_ptr);
3952 
3954  nulls[3] = true;
3955  values[3] = LSNGetDatum(write);
3956 
3957  if (XLogRecPtrIsInvalid(flush))
3958  nulls[4] = true;
3959  values[4] = LSNGetDatum(flush);
3960 
3961  if (XLogRecPtrIsInvalid(apply))
3962  nulls[5] = true;
3963  values[5] = LSNGetDatum(apply);
3964 
3965  /*
3966  * Treat a standby such as a pg_basebackup background process
3967  * which always returns an invalid flush location, as an
3968  * asynchronous standby.
3969  */
3970  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3971 
3972  if (writeLag < 0)
3973  nulls[6] = true;
3974  else
3976 
3977  if (flushLag < 0)
3978  nulls[7] = true;
3979  else
3981 
3982  if (applyLag < 0)
3983  nulls[8] = true;
3984  else
3986 
3987  values[9] = Int32GetDatum(priority);
3988 
3989  /*
3990  * More easily understood version of standby state. This is purely
3991  * informational.
3992  *
3993  * In quorum-based sync replication, the role of each standby
3994  * listed in synchronous_standby_names can be changing very
3995  * frequently. Any standbys considered as "sync" at one moment can
3996  * be switched to "potential" ones at the next moment. So, it's
3997  * basically useless to report "sync" or "potential" as their sync
3998  * states. We report just "quorum" for them.
3999  */
4000  if (priority == 0)
4001  values[10] = CStringGetTextDatum("async");
4002  else if (is_sync_standby)
4004  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4005  else
4006  values[10] = CStringGetTextDatum("potential");
4007 
4008  if (replyTime == 0)
4009  nulls[11] = true;
4010  else
4011  values[11] = TimestampTzGetDatum(replyTime);
4012  }
4013 
4014  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4015  values, nulls);
4016  }
4017 
4018  return (Datum) 0;
4019 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5128
#define MemSet(start, val, len)
Definition: c.h:1020
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:514
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:711
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3847
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3828
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2354 of file walsender.c.

2355 {
2356  bool changed = false;
2358 
2359  Assert(lsn != InvalidXLogRecPtr);
2360  SpinLockAcquire(&slot->mutex);
2361  if (slot->data.restart_lsn != lsn)
2362  {
2363  changed = true;
2364  slot->data.restart_lsn = lsn;
2365  }
2366  SpinLockRelease(&slot->mutex);
2367 
2368  if (changed)
2369  {
2373  }
2374 
2375  /*
2376  * One could argue that the slot should be saved to disk now, but that'd
2377  * be energy wasted - the worst thing lost information could cause here is
2378  * to give wrong information in a statistics view - we'll just potentially
2379  * be more conservative in removing files.
2380  */
2381 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1105
XLogRecPtr restart_lsn
Definition: slot.h:93
slock_t mutex
Definition: slot.h:151
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1733

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2492 of file walsender.c.

2493 {
2494  bool changed = false;
2496 
2497  SpinLockAcquire(&slot->mutex);
2499 
2500  /*
2501  * For physical replication we don't need the interlock provided by xmin
2502  * and effective_xmin since the consequences of a missed increase are
2503  * limited to query cancellations, so set both at once.
2504  */
2505  if (!TransactionIdIsNormal(slot->data.xmin) ||
2506  !TransactionIdIsNormal(feedbackXmin) ||
2507  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2508  {
2509  changed = true;
2510  slot->data.xmin = feedbackXmin;
2511  slot->effective_xmin = feedbackXmin;
2512  }
2513  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2514  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2515  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2516  {
2517  changed = true;
2518  slot->data.catalog_xmin = feedbackCatalogXmin;
2519  slot->effective_catalog_xmin = feedbackCatalogXmin;
2520  }
2521  SpinLockRelease(&slot->mutex);
2522 
2523  if (changed)
2524  {
2527  }
2528 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1049
TransactionId xmin
Definition: proc.h:173
TransactionId xmin
Definition: slot.h:82
TransactionId catalog_xmin
Definition: slot.h:90
TransactionId effective_catalog_xmin
Definition: slot.h:175
TransactionId effective_xmin
Definition: slot.h:174
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1733 of file walsender.c.

1734 {
1736 
1737  /*
1738  * If we are running in a standby, there is no need to wake up walsenders.
1739  * This is because we do not support syncing slots to cascading standbys,
1740  * so, there are no walsenders waiting for standbys to catch up.
1741  */
1742  if (RecoveryInProgress())
1743  return;
1744 
1747 }
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInStandbySlotNames(const char *slot_name)
Definition: slot.c:2547
#define SlotIsPhysical(slot)
Definition: slot.h:209
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInStandbySlotNames(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1614 of file walsender.c.

1615 {
1616  for (;;)
1617  {
1618  long sleeptime;
1619 
1620  /* Check for input from the client */
1622 
1623  /* die if timeout was reached */
1625 
1626  /* Send keepalive if the time has come */
1628 
1629  if (!pq_is_send_pending())
1630  break;
1631 
1633 
1634  /* Sleep until something happens or we time out */
1636  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1637 
1638  /* Clear any already-pending wakeups */
1640 
1642 
1643  /* Process any requests or signals received recently */
1644  if (ConfigReloadPending)
1645  {
1646  ConfigReloadPending = false;
1649  }
1650 
1651  /* Try to flush pending output to the client */
1652  if (pq_flush_if_writable() != 0)
1653  WalSndShutdown();
1654  }
1655 
1656  /* reactivate latch so WalSndLoop knows to continue */
1657  SetLatch(MyLatch);
1658 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
struct Latch * MyLatch
Definition: globals.c:60
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:402
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3687
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2740
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2210
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4057
static void WalSndShutdown(void)
Definition: walsender.c:240
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2696

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2210 of file walsender.c.

2211 {
2212  unsigned char firstchar;
2213  int maxmsglen;
2214  int r;
2215  bool received = false;
2216 
2218 
2219  /*
2220  * If we already received a CopyDone from the frontend, any subsequent
2221  * message is the beginning of a new command, and should be processed in
2222  * the main processing loop.
2223  */
2224  while (!streamingDoneReceiving)
2225  {
2226  pq_startmsgread();
2227  r = pq_getbyte_if_available(&firstchar);
2228  if (r < 0)
2229  {
2230  /* unexpected error or EOF */
2232  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2233  errmsg("unexpected EOF on standby connection")));
2234  proc_exit(0);
2235  }
2236  if (r == 0)
2237  {
2238  /* no data available without blocking */
2239  pq_endmsgread();
2240  break;
2241  }
2242 
2243  /* Validate message type and set packet size limit */
2244  switch (firstchar)
2245  {
2246  case PqMsg_CopyData:
2247  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2248  break;
2249  case PqMsg_CopyDone:
2250  case PqMsg_Terminate:
2251  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2252  break;
2253  default:
2254  ereport(FATAL,
2255  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2256  errmsg("invalid standby message type \"%c\"",
2257  firstchar)));
2258  maxmsglen = 0; /* keep compiler quiet */
2259  break;
2260  }
2261 
2262  /* Read the message contents */
2264  if (pq_getmessage(&reply_message, maxmsglen))
2265  {
2267  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2268  errmsg("unexpected EOF on standby connection")));
2269  proc_exit(0);
2270  }
2271 
2272  /* ... and process it */
2273  switch (firstchar)
2274  {
2275  /*
2276  * 'd' means a standby reply wrapped in a CopyData packet.
2277  */
2278  case PqMsg_CopyData:
2280  received = true;
2281  break;
2282 
2283  /*
2284  * CopyDone means the standby requested to finish streaming.
2285  * Reply with CopyDone, if we had not sent that already.
2286  */
2287  case PqMsg_CopyDone:
2288  if (!streamingDoneSending)
2289  {
2290  pq_putmessage_noblock('c', NULL, 0);
2291  streamingDoneSending = true;
2292  }
2293 
2294  streamingDoneReceiving = true;
2295  received = true;
2296  break;
2297 
2298  /*
2299  * 'X' means that the standby is closing down the socket.
2300  */
2301  case PqMsg_Terminate:
2302  proc_exit(0);
2303 
2304  default:
2305  Assert(false); /* NOT REACHED */
2306  }
2307  }
2308 
2309  /*
2310  * Save the last reply timestamp if we've received at least one reply.
2311  */
2312  if (received)
2313  {
2315  waiting_for_ping_response = false;
2316  }
2317 }
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1003
void pq_endmsgread(void)
Definition: pqcomm.c:1164
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static bool waiting_for_ping_response
Definition: walsender.c:182
static TimestampTz last_processing
Definition: walsender.c:173
static bool streamingDoneSending
Definition: walsender.c:190
static void ProcessStandbyMessage(void)
Definition: walsender.c:2323
static bool streamingDoneReceiving
Definition: walsender.c:191

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2572 of file walsender.c.

2573 {
2574  TransactionId feedbackXmin;
2575  uint32 feedbackEpoch;
2576  TransactionId feedbackCatalogXmin;
2577  uint32 feedbackCatalogEpoch;
2578  TimestampTz replyTime;
2579 
2580  /*
2581  * Decipher the reply message. The caller already consumed the msgtype
2582  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2583  * of this message.
2584  */
2585  replyTime = pq_getmsgint64(&reply_message);
2586  feedbackXmin = pq_getmsgint(&reply_message, 4);
2587  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2588  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2589  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2590 
2592  {
2593  char *replyTimeStr;
2594 
2595  /* Copy because timestamptz_to_str returns a static buffer */
2596  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2597 
2598  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2599  feedbackXmin,
2600  feedbackEpoch,
2601  feedbackCatalogXmin,
2602  feedbackCatalogEpoch,
2603  replyTimeStr);
2604 
2605  pfree(replyTimeStr);
2606  }
2607 
2608  /*
2609  * Update shared state for this WalSender process based on reply data from
2610  * standby.
2611  */
2612  {
2613  WalSnd *walsnd = MyWalSnd;
2614 
2615  SpinLockAcquire(&walsnd->mutex);
2616  walsnd->replyTime = replyTime;
2617  SpinLockRelease(&walsnd->mutex);
2618  }
2619 
2620  /*
2621  * Unset WalSender's xmins if the feedback message values are invalid.
2622  * This happens when the downstream turned hot_standby_feedback off.
2623  */
2624  if (!TransactionIdIsNormal(feedbackXmin)
2625  && !TransactionIdIsNormal(feedbackCatalogXmin))
2626  {
2628  if (MyReplicationSlot != NULL)
2629  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2630  return;
2631  }
2632 
2633  /*
2634  * Check that the provided xmin/epoch are sane, that is, not in the future
2635  * and not so far back as to be already wrapped around. Ignore if not.
2636  */
2637  if (TransactionIdIsNormal(feedbackXmin) &&
2638  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2639  return;
2640 
2641  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2642  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2643  return;
2644 
2645  /*
2646  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2647  * the xmin will be taken into account by GetSnapshotData() /
2648  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2649  * thereby prevent the generation of cleanup conflicts on the standby
2650  * server.
2651  *
2652  * There is a small window for a race condition here: although we just
2653  * checked that feedbackXmin precedes nextXid, the nextXid could have
2654  * gotten advanced between our fetching it and applying the xmin below,
2655  * perhaps far enough to make feedbackXmin wrap around. In that case the
2656  * xmin we set here would be "in the future" and have no effect. No point
2657  * in worrying about this since it's too late to save the desired data
2658  * anyway. Assuming that the standby sends us an increasing sequence of
2659  * xmins, this could only happen during the first reply cycle, else our
2660  * own xmin would prevent nextXid from advancing so far.
2661  *
2662  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2663  * is assumed atomic, and there's no real need to prevent concurrent
2664  * horizon determinations. (If we're moving our xmin forward, this is
2665  * obviously safe, and if we're moving it backwards, well, the data is at
2666  * risk already since a VACUUM could already have determined the horizon.)
2667  *
2668  * If we're using a replication slot we reserve the xmin via that,
2669  * otherwise via the walsender's PGPROC entry. We can only track the
2670  * catalog xmin separately when using a slot, so we store the least of the
2671  * two provided when not using a slot.
2672  *
2673  * XXX: It might make sense to generalize the ephemeral slot concept and
2674  * always use the slot mechanism to handle the feedback xmin.
2675  */
2676  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2677  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2678  else
2679  {
2680  if (TransactionIdIsNormal(feedbackCatalogXmin)
2681  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2682  MyProc->xmin = feedbackCatalogXmin;
2683  else
2684  MyProc->xmin = feedbackXmin;
2685  }
2686 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1853
unsigned int uint32
Definition: c.h:506
uint32 TransactionId
Definition: c.h:652
bool message_level_is_interesting(int elevel)
Definition: elog.c:276
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1695
void pfree(void *pointer)
Definition: mcxt.c:1520
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2492
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2541

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2323 of file walsender.c.

2324 {
2325  char msgtype;
2326 
2327  /*
2328  * Check message type from the first byte.
2329  */
2330  msgtype = pq_getmsgbyte(&reply_message);
2331 
2332  switch (msgtype)
2333  {
2334  case 'r':
2336  break;
2337 
2338  case 'h':
2340  break;
2341 
2342  default:
2344  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2345  errmsg("unexpected message type \"%c\"", msgtype)));
2346  proc_exit(0);
2347  }
2348 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2572
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2387

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2387 of file walsender.c.

2388 {
2389  XLogRecPtr writePtr,
2390  flushPtr,
2391  applyPtr;
2392  bool replyRequested;
2393  TimeOffset writeLag,
2394  flushLag,
2395  applyLag;
2396  bool clearLagTimes;
2397  TimestampTz now;
2398  TimestampTz replyTime;
2399 
2400  static bool fullyAppliedLastTime = false;
2401 
2402  /* the caller already consumed the msgtype byte */
2403  writePtr = pq_getmsgint64(&reply_message);
2404  flushPtr = pq_getmsgint64(&reply_message);
2405  applyPtr = pq_getmsgint64(&reply_message);
2406  replyTime = pq_getmsgint64(&reply_message);
2407  replyRequested = pq_getmsgbyte(&reply_message);
2408 
2410  {
2411  char *replyTimeStr;
2412 
2413  /* Copy because timestamptz_to_str returns a static buffer */
2414  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2415 
2416  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2417  LSN_FORMAT_ARGS(writePtr),
2418  LSN_FORMAT_ARGS(flushPtr),
2419  LSN_FORMAT_ARGS(applyPtr),
2420  replyRequested ? " (reply requested)" : "",
2421  replyTimeStr);
2422 
2423  pfree(replyTimeStr);
2424  }
2425 
2426  /* See if we can compute the round-trip lag for these positions. */
2428  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2429  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2430  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2431 
2432  /*
2433  * If the standby reports that it has fully replayed the WAL in two
2434  * consecutive reply messages, then the second such message must result
2435  * from wal_receiver_status_interval expiring on the standby. This is a
2436  * convenient time to forget the lag times measured when it last
2437  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2438  * until more WAL traffic arrives.
2439  */
2440  clearLagTimes = false;
2441  if (applyPtr == sentPtr)
2442  {
2443  if (fullyAppliedLastTime)
2444  clearLagTimes = true;
2445  fullyAppliedLastTime = true;
2446  }
2447  else
2448  fullyAppliedLastTime = false;
2449 
2450  /* Send a reply if the standby requested one. */
2451  if (replyRequested)
2453 
2454  /*
2455  * Update shared state for this WalSender process based on reply data from
2456  * standby.
2457  */
2458  {
2459  WalSnd *walsnd = MyWalSnd;
2460 
2461  SpinLockAcquire(&walsnd->mutex);
2462  walsnd->write = writePtr;
2463  walsnd->flush = flushPtr;
2464  walsnd->apply = applyPtr;
2465  if (writeLag != -1 || clearLagTimes)
2466  walsnd->writeLag = writeLag;
2467  if (flushLag != -1 || clearLagTimes)
2468  walsnd->flushLag = flushLag;
2469  if (applyLag != -1 || clearLagTimes)
2470  walsnd->applyLag = applyLag;
2471  walsnd->replyTime = replyTime;
2472  SpinLockRelease(&walsnd->mutex);
2473  }
2474 
2477 
2478  /*
2479  * Advance our local xmin horizon when the client confirmed a flush.
2480  */
2481  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2482  {
2485  else
2487  }
2488 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1834
#define SlotIsLogical(slot)
Definition: slot.h:210
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:431
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:165
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2354
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4034
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4160

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 490 of file walsender.c.

491 {
492 #define READ_REPLICATION_SLOT_COLS 3
493  ReplicationSlot *slot;
495  TupOutputState *tstate;
496  TupleDesc tupdesc;
498  bool nulls[READ_REPLICATION_SLOT_COLS];
499 
501  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
502  TEXTOID, -1, 0);
503  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
504  TEXTOID, -1, 0);
505  /* TimeLineID is unsigned, so int4 is not wide enough. */
506  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
507  INT8OID, -1, 0);
508 
509  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
510 
511  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
512  slot = SearchNamedReplicationSlot(cmd->slotname, false);
513  if (slot == NULL || !slot->in_use)
514  {
515  LWLockRelease(ReplicationSlotControlLock);
516  }
517  else
518  {
519  ReplicationSlot slot_contents;
520  int i = 0;
521 
522  /* Copy slot contents while holding spinlock */
523  SpinLockAcquire(&slot->mutex);
524  slot_contents = *slot;
525  SpinLockRelease(&slot->mutex);
526  LWLockRelease(ReplicationSlotControlLock);
527 
528  if (OidIsValid(slot_contents.data.database))
529  ereport(ERROR,
530  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
531  errmsg("cannot use %s with a logical replication slot",
532  "READ_REPLICATION_SLOT"));
533 
534  /* slot type */
535  values[i] = CStringGetTextDatum("physical");
536  nulls[i] = false;
537  i++;
538 
539  /* start LSN */
540  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541  {
542  char xloc[64];
543 
544  snprintf(xloc, sizeof(xloc), "%X/%X",
545  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
546  values[i] = CStringGetTextDatum(xloc);
547  nulls[i] = false;
548  }
549  i++;
550 
551  /* timeline this WAL was produced on */
552  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
553  {
554  TimeLineID slots_position_timeline;
555  TimeLineID current_timeline;
556  List *timeline_history = NIL;
557 
558  /*
559  * While in recovery, use as timeline the currently-replaying one
560  * to get the LSN position's history.
561  */
562  if (RecoveryInProgress())
563  (void) GetXLogReplayRecPtr(&current_timeline);
564  else
565  current_timeline = GetWALInsertionTimeLine();
566 
567  timeline_history = readTimeLineHistory(current_timeline);
568  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
569  timeline_history);
570  values[i] = Int64GetDatum((int64) slots_position_timeline);
571  nulls[i] = false;
572  }
573  i++;
574 
576  }
577 
579  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
580  do_tup_output(tstate, values, nulls);
581  end_tup_output(tstate);
582 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:775
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
Definition: pg_list.h:54
bool in_use
Definition: slot.h:154
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 589 of file walsender.c.

590 {
592  TupleDesc tupdesc;
594  char histfname[MAXFNAMELEN];
595  char path[MAXPGPATH];
596  int fd;
597  off_t histfilelen;
598  off_t bytesleft;
599  Size len;
600 
602 
603  /*
604  * Reply with a result set with one row, and two columns. The first col is
605  * the name of the history file, 2nd is the contents.
606  */
607  tupdesc = CreateTemplateTupleDesc(2);
608  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
609  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
610 
611  TLHistoryFileName(histfname, cmd->timeline);
612  TLHistoryFilePath(path, cmd->timeline);
613 
614  /* Send a RowDescription message */
615  dest->rStartup(dest, CMD_SELECT, tupdesc);
616 
617  /* Send a DataRow message */
619  pq_sendint16(&buf, 2); /* # of columns */
620  len = strlen(histfname);
621  pq_sendint32(&buf, len); /* col1 len */
622  pq_sendbytes(&buf, histfname, len);
623 
624  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
625  if (fd < 0)
626  ereport(ERROR,
628  errmsg("could not open file \"%s\": %m", path)));
629 
630  /* Determine file length and send it to client */
631  histfilelen = lseek(fd, 0, SEEK_END);
632  if (histfilelen < 0)
633  ereport(ERROR,
635  errmsg("could not seek to end of file \"%s\": %m", path)));
636  if (lseek(fd, 0, SEEK_SET) != 0)
637  ereport(ERROR,
639  errmsg("could not seek to beginning of file \"%s\": %m", path)));
640 
641  pq_sendint32(&buf, histfilelen); /* col2 len */
642 
643  bytesleft = histfilelen;
644  while (bytesleft > 0)
645  {
646  PGAlignedBlock rbuf;
647  int nread;
648 
649  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
650  nread = read(fd, rbuf.data, sizeof(rbuf));
652  if (nread < 0)
653  ereport(ERROR,
655  errmsg("could not read file \"%s\": %m",
656  path)));
657  else if (nread == 0)
658  ereport(ERROR,
660  errmsg("could not read file \"%s\": read %d of %zu",
661  path, nread, (Size) bytesleft)));
662 
663  pq_sendbytes(&buf, rbuf.data, nread);
664  bytesleft -= nread;
665  }
666 
667  if (CloseTransientFile(fd) != 0)
668  ereport(ERROR,
670  errmsg("could not close file \"%s\": %m", path)));
671 
672  pq_endmessage(&buf);
673 }
#define PG_BINARY
Definition: c.h:1273
size_t Size
Definition: c.h:605
int errcode_for_file_access(void)
Definition: elog.c:880
int CloseTransientFile(int fd)
Definition: fd.c:2809
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:265
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1119
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:82
static void pgstat_report_wait_end(void)
Definition: wait_event.h:98
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1452 of file walsender.c.

1453 {
1455  QueryCompletion qc;
1456 
1457  /* make sure that our requirements are still fulfilled */
1459 
1461 
1462  ReplicationSlotAcquire(cmd->slotname, true);
1463 
1464  /*
1465  * Force a disconnect, so that the decoding code doesn't need to care
1466  * about an eventual switch from running in recovery, to running in a
1467  * normal environment. Client code is expected to handle reconnects.
1468  */
1470  {
1471  ereport(LOG,
1472  (errmsg("terminating walsender process after promotion")));
1473  got_STOPPING = true;
1474  }
1475 
1476  /*
1477  * Create our decoding context, making it start at the previously ack'ed
1478  * position.
1479  *
1480  * Do this before sending a CopyBothResponse message, so that any errors
1481  * are reported early.
1482  */
1484  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1485  XL_ROUTINE(.page_read = logical_read_xlog_page,
1486  .segment_open = WalSndSegmentOpen,
1487  .segment_close = wal_segment_close),
1491 
1493 
1494  /* Send a CopyBothResponse message, and start streaming */
1496  pq_sendbyte(&buf, 0);
1497  pq_sendint16(&buf, 0);
1498  pq_endmessage(&buf);
1499  pq_flush();
1500 
1501  /* Start reading WAL from the oldest required WAL. */
1504 
1505  /*
1506  * Report the location after which we'll send out further commits as the
1507  * current sentPtr.
1508  */
1510 
1511  /* Also update the sent position status in shared memory */
1515 
1516  replication_active = true;
1517 
1519 
1520  /* Main loop of walsender */
1522 
1525 
1526  replication_active = false;
1527  if (got_STOPPING)
1528  proc_exit(0);
1530 
1531  /* Get out of COPY mode (CommandComplete). */
1532  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1533  EndCommand(&qc, DestRemote, false);
1534 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:495
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2767
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:208
static void XLogSendLogical(void)
Definition: walsender.c:3376
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 819 of file walsender.c.

820 {
822  XLogRecPtr FlushPtr;
823  TimeLineID FlushTLI;
824 
825  /* create xlogreader for physical replication */
826  xlogreader =
828  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
829  .segment_close = wal_segment_close),
830  NULL);
831 
832  if (!xlogreader)
833  ereport(ERROR,
834  (errcode(ERRCODE_OUT_OF_MEMORY),
835  errmsg("out of memory"),
836  errdetail("Failed while allocating a WAL reading processor.")));
837 
838  /*
839  * We assume here that we're logging enough information in the WAL for
840  * log-shipping, since this is checked in PostmasterMain().
841  *
842  * NOTE: wal_level can only change at shutdown, so in most cases it is
843  * difficult for there to be WAL data that we can still see that was
844  * written at wal_level='minimal'.
845  */
846 
847  if (cmd->slotname)
848  {
849  ReplicationSlotAcquire(cmd->slotname, true);
851  ereport(ERROR,
852  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
853  errmsg("cannot use a logical replication slot for physical replication")));
854 
855  /*
856  * We don't need to verify the slot's restart_lsn here; instead we
857  * rely on the caller requesting the starting point to use. If the
858  * WAL segment doesn't exist, we'll fail later.
859  */
860  }
861 
862  /*
863  * Select the timeline. If it was given explicitly by the client, use
864  * that. Otherwise use the timeline of the last replayed record.
865  */
868  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
869  else
870  FlushPtr = GetFlushRecPtr(&FlushTLI);
871 
872  if (cmd->timeline != 0)
873  {
874  XLogRecPtr switchpoint;
875 
876  sendTimeLine = cmd->timeline;
877  if (sendTimeLine == FlushTLI)
878  {
879  sendTimeLineIsHistoric = false;
881  }
882  else
883  {
884  List *timeLineHistory;
885 
886  sendTimeLineIsHistoric = true;
887 
888  /*
889  * Check that the timeline the client requested exists, and the
890  * requested start location is on that timeline.
891  */
892  timeLineHistory = readTimeLineHistory(FlushTLI);
893  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
895  list_free_deep(timeLineHistory);
896 
897  /*
898  * Found the requested timeline in the history. Check that
899  * requested startpoint is on that timeline in our history.
900  *
901  * This is quite loose on purpose. We only check that we didn't
902  * fork off the requested timeline before the switchpoint. We
903  * don't check that we switched *to* it before the requested
904  * starting point. This is because the client can legitimately
905  * request to start replication from the beginning of the WAL
906  * segment that contains switchpoint, but on the new timeline, so
907  * that it doesn't end up with a partial segment. If you ask for
908  * too old a starting point, you'll get an error later when we
909  * fail to find the requested WAL segment in pg_wal.
910  *
911  * XXX: we could be more strict here and only allow a startpoint
912  * that's older than the switchpoint, if it's still in the same
913  * WAL segment.
914  */
915  if (!XLogRecPtrIsInvalid(switchpoint) &&
916  switchpoint < cmd->startpoint)
917  {
918  ereport(ERROR,
919  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
921  cmd->timeline),
922  errdetail("This server's history forked from timeline %u at %X/%X.",
923  cmd->timeline,
924  LSN_FORMAT_ARGS(switchpoint))));
925  }
926  sendTimeLineValidUpto = switchpoint;
927  }
928  }
929  else
930  {
931  sendTimeLine = FlushTLI;
933  sendTimeLineIsHistoric = false;
934  }
935 
937 
938  /* If there is nothing to stream, don't even enter COPY mode */
940  {
941  /*
942  * When we first start replication the standby will be behind the
943  * primary. For some applications, for example synchronous
944  * replication, it is important to have a clear state for this initial
945  * catchup mode, so we can trigger actions when we change streaming
946  * state later. We may stay in this state for a long time, which is
947  * exactly why we want to be able to monitor whether or not we are
948  * still here.
949  */
951 
952  /* Send a CopyBothResponse message, and start streaming */
954  pq_sendbyte(&buf, 0);
955  pq_sendint16(&buf, 0);
956  pq_endmessage(&buf);
957  pq_flush();
958 
959  /*
960  * Don't allow a request to stream from a future point in WAL that
961  * hasn't been flushed to disk in this server yet.
962  */
963  if (FlushPtr < cmd->startpoint)
964  {
965  ereport(ERROR,
966  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
968  LSN_FORMAT_ARGS(FlushPtr))));
969  }
970 
971  /* Start streaming from the requested point */
972  sentPtr = cmd->startpoint;
973 
974  /* Initialize shared memory status, too */
978 
980 
981  /* Main loop of walsender */
982  replication_active = true;
983 
985 
986  replication_active = false;
987  if (got_STOPPING)
988  proc_exit(0);
990 
992  }
993 
994  if (cmd->slotname)
996 
997  /*
998  * Copy is finished now. Send a single-row result set indicating the next
999  * timeline.
1000  */
1002  {
1003  char startpos_str[8 + 1 + 8 + 1];
1004  DestReceiver *dest;
1005  TupOutputState *tstate;
1006  TupleDesc tupdesc;
1007  Datum values[2];
1008  bool nulls[2] = {0};
1009 
1010  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
1012 
1014 
1015  /*
1016  * Need a tuple descriptor representing two columns. int8 may seem
1017  * like a surprising data type for this, but in theory int4 would not
1018  * be wide enough for this, as TimeLineID is unsigned.
1019  */
1020  tupdesc = CreateTemplateTupleDesc(2);
1021  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1022  INT8OID, -1, 0);
1023  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1024  TEXTOID, -1, 0);
1025 
1026  /* prepare for projection of tuple */
1027  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1028 
1030  values[1] = CStringGetTextDatum(startpos_str);
1031 
1032  /* send it to dest */
1033  do_tup_output(tstate, values, nulls);
1034 
1035  end_tup_output(tstate);
1036  }
1037 
1038  /* Send CommandComplete message */
1039  EndReplicationCommand("START_STREAMING");
1040 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1203
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3066
int wal_segment_size
Definition: xlog.c:143
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:106

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2541 of file walsender.c.

2542 {
2543  FullTransactionId nextFullXid;
2544  TransactionId nextXid;
2545  uint32 nextEpoch;
2546 
2547  nextFullXid = ReadNextFullTransactionId();
2548  nextXid = XidFromFullTransactionId(nextFullXid);
2549  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2550 
2551  if (xid <= nextXid)
2552  {
2553  if (epoch != nextEpoch)
2554  return false;
2555  }
2556  else
2557  {
2558  if (epoch + 1 != nextEpoch)
2559  return false;
2560  }
2561 
2562  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2563  return false; /* epoch OK, but it's wrapped around */
2564 
2565  return true;
2566 }
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 679 of file walsender.c.

680 {
681  MemoryContext mcxt;
683  off_t offset = 0;
685 
686  /*
687  * parsing the manifest will use the cryptohash stuff, which requires a
688  * resource owner
689  */
690  Assert(CurrentResourceOwner == NULL);
691  CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
692 
693  /* Prepare to read manifest data into a temporary context. */
695  "incremental backup information",
697  ib = CreateIncrementalBackupInfo(mcxt);
698 
699  /* Send a CopyInResponse message */
700  pq_beginmessage(&buf, 'G');
701  pq_sendbyte(&buf, 0);
702  pq_sendint16(&buf, 0);
704  pq_flush();
705 
706  /* Receive packets from client until done. */
707  while (HandleUploadManifestPacket(&buf, &offset, ib))
708  ;
709 
710  /* Finish up manifest processing. */
712 
713  /*
714  * Discard any old manifest information and arrange to preserve the new
715  * information we just got.
716  *
717  * We assume that MemoryContextDelete and MemoryContextSetParent won't
718  * fail, and thus we shouldn't end up bailing out of here in such a way as
719  * to leave dangling pointers.
720  */
721  if (uploaded_manifest_mcxt != NULL)
724  uploaded_manifest = ib;
725  uploaded_manifest_mcxt = mcxt;
726 
727  /* clean up the resource owner we created */
728  WalSndResourceCleanup(true);
729 }
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:637
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:413
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:743
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:362
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:148

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), ResourceOwnerCreate(), uploaded_manifest, uploaded_manifest_mcxt, and WalSndResourceCleanup().

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2740 of file walsender.c.

2741 {
2742  TimestampTz timeout;
2743 
2744  /* don't bail out if we're doing something that doesn't require timeouts */
2745  if (last_reply_timestamp <= 0)
2746  return;
2747 
2750 
2751  if (wal_sender_timeout > 0 && last_processing >= timeout)
2752  {
2753  /*
2754  * Since typically expiration of replication timeout means
2755  * communication problem, we don't send the error message to the
2756  * standby.
2757  */
2759  (errmsg("terminating walsender process due to replication timeout")));
2760 
2761  WalSndShutdown();
2762  }
2763 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:123

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2696 of file walsender.c.

2697 {
2698  long sleeptime = 10000; /* 10 s */
2699 
2701  {
2702  TimestampTz wakeup_time;
2703 
2704  /*
2705  * At the latest stop sleeping once wal_sender_timeout has been
2706  * reached.
2707  */
2710 
2711  /*
2712  * If no ping has been sent yet, wakeup when it's time to do so.
2713  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2714  * the timeout passed without a response.
2715  */
2718  wal_sender_timeout / 2);
2719 
2720  /* Compute relative time until wakeup. */
2721  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2722  }
2723 
2724  return sleeptime;
2725 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3461 of file walsender.c.

3462 {
3463  XLogRecPtr replicatedPtr;
3464 
3465  /* ... let's just be real sure we're caught up ... */
3466  send_data();
3467 
3468  /*
3469  * To figure out whether all WAL has successfully been replicated, check
3470  * flush location if valid, write otherwise. Tools like pg_receivewal will
3471  * usually (unless in synchronous mode) return an invalid flush location.
3472  */
3473  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3475 
3476  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3477  !pq_is_send_pending())
3478  {
3479  QueryCompletion qc;
3480 
3481  /* Inform the standby that XLOG streaming is done */
3482  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3483  EndCommand(&qc, DestRemote, false);
3484  pq_flush();
3485 
3486  proc_exit(0);
3487  }
3490 }
static bool WalSndCaughtUp
Definition: walsender.c:194

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 327 of file walsender.c.

328 {
332 
333  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
335 
336  if (MyReplicationSlot != NULL)
338 
339  ReplicationSlotCleanup(false);
340 
341  replication_active = false;
342 
343  /*
344  * If there is a transaction in progress, it will clean up our
345  * ResourceOwner, but if a replication command set up a resource owner
346  * without a transaction, we've got to clean that up now.
347  */
349  WalSndResourceCleanup(false);
350 
351  if (got_STOPPING || got_SIGUSR2)
352  proc_exit(0);
353 
354  /* Revert back to startup state */
356 }
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1878
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3828 of file walsender.c.

3829 {
3830  switch (state)
3831  {
3832  case WALSNDSTATE_STARTUP:
3833  return "startup";
3834  case WALSNDSTATE_BACKUP:
3835  return "backup";
3836  case WALSNDSTATE_CATCHUP:
3837  return "catchup";
3838  case WALSNDSTATE_STREAMING:
3839  return "streaming";
3840  case WALSNDSTATE_STOPPING:
3841  return "stopping";
3842  }
3843  return "UNKNOWN";
3844 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3745 of file walsender.c.

3746 {
3747  int i;
3748 
3749  for (i = 0; i < max_wal_senders; i++)
3750  {
3751  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3752  pid_t pid;
3753 
3754  SpinLockAcquire(&walsnd->mutex);
3755  pid = walsnd->pid;
3756  SpinLockRelease(&walsnd->mutex);
3757 
3758  if (pid == 0)
3759  continue;
3760 
3762  }
3763 }
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:257
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4034 of file walsender.c.

4035 {
4036  elog(DEBUG2, "sending replication keepalive");
4037 
4038  /* construct the message... */
4040  pq_sendbyte(&output_message, 'k');
4041  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
4043  pq_sendbyte(&output_message, requestReply ? 1 : 0);
4044 
4045  /* ... and send it wrapped in CopyData */
4047 
4048  /* Set local flag */
4049  if (requestReply)
4051 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4057 of file walsender.c.

4058 {
4059  TimestampTz ping_time;
4060 
4061  /*
4062  * Don't send keepalive messages if timeouts are globally disabled or
4063  * we're doing something not partaking in timeouts.
4064  */
4065  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
4066  return;
4067 
4069  return;
4070 
4071  /*
4072  * If half of wal_sender_timeout has lapsed without receiving any reply
4073  * from the standby, send a keep-alive message to the standby requesting
4074  * an immediate reply.
4075  */
4077  wal_sender_timeout / 2);
4078  if (last_processing >= ping_time)
4079  {
4081 
4082  /* Try to flush pending output to the client */
4083  if (pq_flush_if_writable() != 0)
4084  WalSndShutdown();
4085  }
4086 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2970 of file walsender.c.

2971 {
2972  WalSnd *walsnd = MyWalSnd;
2973 
2974  Assert(walsnd != NULL);
2975 
2976  MyWalSnd = NULL;
2977 
2978  SpinLockAcquire(&walsnd->mutex);
2979  /* clear latch while holding the spinlock, so it can safely be read */
2980  walsnd->latch = NULL;
2981  /* Mark WalSnd struct as no longer being in use. */
2982  walsnd->pid = 0;
2983  SpinLockRelease(&walsnd->mutex);
2984 }

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3582 of file walsender.c.

3583 {
3584  got_SIGUSR2 = true;
3585  SetLatch(MyLatch);
3586 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2767 of file walsender.c.

2768 {
2769  /*
2770  * Initialize the last reply timestamp. That enables timeout processing
2771  * from hereon.
2772  */
2774  waiting_for_ping_response = false;
2775 
2776  /*
2777  * Loop until we reach the end of this timeline or the client requests to
2778  * stop streaming.
2779  */
2780  for (;;)
2781  {
2782  /* Clear any already-pending wakeups */
2784 
2786 
2787  /* Process any requests or signals received recently */
2788  if (ConfigReloadPending)
2789  {
2790  ConfigReloadPending = false;
2793  }
2794 
2795  /* Check for input from the client */
2797 
2798  /*
2799  * If we have received CopyDone from the client, sent CopyDone
2800  * ourselves, and the output buffer is empty, it's time to exit
2801  * streaming.
2802  */
2804  !pq_is_send_pending())
2805  break;
2806 
2807  /*
2808  * If we don't have any pending data in the output buffer, try to send
2809  * some more. If there is some, we don't bother to call send_data
2810  * again until we've flushed it ... but we'd better assume we are not
2811  * caught up.
2812  */
2813  if (!pq_is_send_pending())
2814  send_data();
2815  else
2816  WalSndCaughtUp = false;
2817 
2818  /* Try to flush pending output to the client */
2819  if (pq_flush_if_writable() != 0)
2820  WalSndShutdown();
2821 
2822  /* If nothing remains to be sent right now ... */
2824  {
2825  /*
2826  * If we're in catchup state, move to streaming. This is an
2827  * important state change for users to know about, since before
2828  * this point data loss might occur if the primary dies and we
2829  * need to failover to the standby. The state change is also
2830  * important for synchronous replication, since commits that
2831  * started to wait at that point might wait for some time.
2832  */
2834  {
2835  ereport(DEBUG1,
2836  (errmsg_internal("\"%s\" has now caught up with upstream server",
2837  application_name)));
2839  }
2840 
2841  /*
2842  * When SIGUSR2 arrives, we send any outstanding logs up to the
2843  * shutdown checkpoint record (i.e., the latest record), wait for
2844  * them to be replicated to the standby, and exit. This may be a
2845  * normal termination at shutdown, or a promotion, the walsender
2846  * is not sure which.
2847  */
2848  if (got_SIGUSR2)
2849  WalSndDone(send_data);
2850  }
2851 
2852  /* Check for replication timeout. */
2854 
2855  /* Send keepalive if the time has come */
2857 
2858  /*
2859  * Block if we have unsent data. XXX For logical replication, let
2860  * WalSndWaitForWal() handle any other blocking; idle receivers need
2861  * its additional actions. For physical replication, also block if
2862  * caught up; its send_data does not block.
2863  */
2864  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2867  {
2868  long sleeptime;
2869  int wakeEvents;
2870 
2872  wakeEvents = WL_SOCKET_READABLE;
2873  else
2874  wakeEvents = 0;
2875 
2876  /*
2877  * Use fresh timestamp, not last_processing, to reduce the chance
2878  * of reaching wal_sender_timeout before sending a keepalive.
2879  */
2881 
2882  if (pq_is_send_pending())
2883  wakeEvents |= WL_SOCKET_WRITEABLE;
2884 
2885  /* Sleep until something happens or we time out */
2886  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2887  }
2888  }
2889 }
char * application_name
Definition: guc_tables.c:545
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3461

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1545 of file walsender.c.

1546 {
1547  /* can't have sync rep confused by sending the same LSN several times */
1548  if (!last_write)
1549  lsn = InvalidXLogRecPtr;
1550 
1551  resetStringInfo(ctx->out);
1552 
1553  pq_sendbyte(ctx->out, 'w');
1554  pq_sendint64(ctx->out, lsn); /* dataStart */
1555  pq_sendint64(ctx->out, lsn); /* walEnd */
1556 
1557  /*
1558  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1559  * reserve space here.
1560  */
1561  pq_sendint64(ctx->out, 0); /* sendtime */
1562 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 362 of file walsender.c.

363 {
364  ResourceOwner resowner;
365 
366  if (CurrentResourceOwner == NULL)
367  return;
368 
369  /*
370  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
371  * in a local variable and clear it first.
372  */
373  resowner = CurrentResourceOwner;
374  CurrentResourceOwner = NULL;
375 
376  /* Now we can release resources and delete it. */
377  ResourceOwnerRelease(resowner,
378  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
379  ResourceOwnerRelease(resowner,
380  RESOURCE_RELEASE_LOCKS, isCommit, true);
381  ResourceOwnerRelease(resowner,
382  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
383  ResourceOwnerDelete(resowner);
384 }
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), UploadManifest(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3537 of file walsender.c.

3538 {
3539  int i;
3540 
3541  for (i = 0; i < max_wal_senders; i++)
3542  {
3543  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3544 
3545  SpinLockAcquire(&walsnd->mutex);
3546  if (walsnd->pid == 0)
3547  {
3548  SpinLockRelease(&walsnd->mutex);
3549  continue;
3550  }
3551  walsnd->needreload = true;
3552  SpinLockRelease(&walsnd->mutex);
3553  }
3554 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2988 of file walsender.c.

2990 {
2991  char path[MAXPGPATH];
2992 
2993  /*-------
2994  * When reading from a historic timeline, and there is a timeline switch
2995  * within this segment, read from the WAL segment belonging to the new
2996  * timeline.
2997  *
2998  * For example, imagine that this server is currently on timeline 5, and
2999  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3000  * 0/13002088. In pg_wal, we have these files:
3001  *
3002  * ...
3003  * 000000040000000000000012
3004  * 000000040000000000000013
3005  * 000000050000000000000013
3006  * 000000050000000000000014
3007  * ...
3008  *
3009  * In this situation, when requested to send the WAL from segment 0x13, on
3010  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3011  * recovery prefers files from newer timelines, so if the segment was
3012  * restored from the archive on this server, the file belonging to the old
3013  * timeline, 000000040000000000000013, might not exist. Their contents are
3014  * equal up to the switchpoint, because at a timeline switch, the used
3015  * portion of the old segment is copied to the new file.
3016  */
3017  *tli_p = sendTimeLine;
3019  {
3020  XLogSegNo endSegNo;
3021 
3022  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3023  if (nextSegNo == endSegNo)
3024  *tli_p = sendTimeLineNextTLI;
3025  }
3026 
3027  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3028  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3029  if (state->seg.ws_file >= 0)
3030  return;
3031 
3032  /*
3033  * If the file is not found, assume it's because the standby asked for a
3034  * too old WAL segment that has already been removed or recycled.
3035  */
3036  if (errno == ENOENT)
3037  {
3038  char xlogfname[MAXFNAMELEN];
3039  int save_errno = errno;
3040 
3041  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3042  errno = save_errno;
3043  ereport(ERROR,
3045  errmsg("requested WAL segment %s has already been removed",
3046  xlogfname)));
3047  }
3048  else
3049  ereport(ERROR,
3051  errmsg("could not open file \"%s\": %m",
3052  path)));
3053 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1087
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3809 of file walsender.c.

3810 {
3811  WalSnd *walsnd = MyWalSnd;
3812 
3814 
3815  if (walsnd->state == state)
3816  return;
3817 
3818  SpinLockAcquire(&walsnd->mutex);
3819  walsnd->state = state;
3820  SpinLockRelease(&walsnd->mutex);
3821 }

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3621 of file walsender.c.

3622 {
3623  bool found;
3624  int i;
3625 
3626  WalSndCtl = (WalSndCtlData *)
3627  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3628 
3629  if (!found)
3630  {
3631  /* First time through, so initialize */
3633 
3634  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3636 
3637  for (i = 0; i < max_wal_senders; i++)
3638  {
3639  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3640 
3641  SpinLockInit(&walsnd->mutex);
3642  }
3643 
3647  }
3648 }
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:60
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3609

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3609 of file walsender.c.

3610 {
3611  Size size = 0;
3612 
3613  size = offsetof(WalSndCtlData, walsnds);
3615 
3616  return size;
3617 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 240 of file walsender.c.

279 {
281 
282  /* Create a per-walsender data structure in shared memory */
284 
285  /*
286  * We don't currently need any ResourceOwner in a walsender process, but
287  * if we did, we could call CreateAuxProcessResourceOwner here.
288  */
289 
290  /*
291  * Let postmaster know that we're a WAL sender. Once we've declared us as
292  * a WAL sender process, postmaster will let us outlive the bgwriter and
293  * kill us last in the shutdown sequence, so we get a chance to stream all
294  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
295  * there's no going back, and we mustn't write any WAL records after this.
296  */
299 
300  /*
301  * If the client didn't specify a database to connect to, show in PGPROC
302  * that our advertised xmin should affect vacuum horizons in all
303  * databases. This allows physical replication clients to send hot
304  * standby feedback that will delay vacuum cleanup in all databases.
305  */
306  if (MyDatabaseId == InvalidOid)
307  {
309  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
312  LWLockRelease(ProcArrayLock);
313  }
314 
315  /* Initialize empty timestamp buffer for lag tracking. */
317 }
@ LW_EXCLUSIVE
Definition: lwlock.h:114
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1214
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
PROC_HDR * ProcGlobal
Definition: proc.c:78
uint8 statusFlags
Definition: proc.h:238
int pgxactoff
Definition: proc.h:180
uint8 * statusFlags
Definition: proc.h:395
static void InitWalSenderSlot(void)
Definition: walsender.c:2893

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3590 of file walsender.c.

3591 {
3592  /* Set up signal handlers */
3594  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3595  pqsignal(SIGTERM, die); /* request shutdown */
3596  /* SIGQUIT handler was already set up by InitPostmasterChild */
3597  InitializeTimeouts(); /* establishes SIGALRM handler */
3600  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3601  * shutdown */
3602 
3603  /* Reset some signals that are accepted by postmaster but not here */
3605 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3002
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3582
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1668 of file walsender.c.

1670 {
1671  static TimestampTz sendTime = 0;
1673  bool pending_writes = false;
1674  bool end_xact = ctx->end_xact;
1675 
1676  /*
1677  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1678  * avoid flooding the lag tracker when we commit frequently.
1679  *
1680  * We don't have a mechanism to get the ack for any LSN other than end
1681  * xact LSN from the downstream. So, we track lag only for end of
1682  * transaction LSN.
1683  */
1684 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1685  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1687  {
1688  LagTrackerWrite(lsn, now);
1689  sendTime = now;
1690  }
1691 
1692  /*
1693  * When skipping empty transactions in synchronous replication, we send a
1694  * keepalive message to avoid delaying such transactions.
1695  *
1696  * It is okay to check sync_standbys_defined flag without lock here as in
1697  * the worst case we will just send an extra keepalive message when it is
1698  * really not required.
1699  */
1700  if (skipped_xact &&
1701  SyncRepRequested() &&
1702  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1703  {
1704  WalSndKeepalive(false, lsn);
1705 
1706  /* Try to flush pending output to the client */
1707  if (pq_flush_if_writable() != 0)
1708  WalSndShutdown();
1709 
1710  /* If we have pending write here, make sure it's actually flushed */
1711  if (pq_is_send_pending())
1712  pending_writes = true;
1713  }
1714 
1715  /*
1716  * Process pending writes if any or try to send a keepalive if required.
1717  * We don't need to try sending keep alive messages at the transaction end
1718  * as that will be done at a later point in time. This is required only
1719  * for large transactions where we don't send any changes to the
1720  * downstream and the receiver can timeout due to that.
1721  */
1722  if (pending_writes || (!end_xact &&
1724  wal_sender_timeout / 2)))
1726 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1790
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1614
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4095

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3687 of file walsender.c.

3688 {
3689  WaitEvent event;
3690 
3691  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3692 
3693  /*
3694  * We use a condition variable to efficiently wake up walsenders in
3695  * WalSndWakeup().
3696  *
3697  * Every walsender prepares to sleep on a shared memory CV. Note that it
3698  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3699  * waitlist), but does not actually wait on the CV (IOW, it never calls
3700  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3701  * waiting, because we also need to wait for socket events. The processes
3702  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3703  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3704  * walsenders come out of WaitEventSetWait().
3705  *
3706  * This approach is simple and efficient because, one doesn't have to loop
3707  * through all the walsenders slots, with a spinlock acquisition and
3708  * release for every iteration, just to wake up only the waiting
3709  * walsenders. It makes WalSndWakeup() callers' life easy.
3710  *
3711  * XXX: A desirable future improvement would be to add support for CVs
3712  * into WaitEventSetWait().
3713  *
3714  * And, we use separate shared memory CVs for physical and logical
3715  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3716  *
3717  * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3718  * until awakened by physical walsenders after the walreceiver confirms
3719  * the receipt of the LSN.
3720  */
3721  if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3725  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3727 
3728  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3729  (event.events & WL_POSTMASTER_DEATH))
3730  {
3732  proc_exit(1);
3733  }
3734 
3736 }
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1049
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1424
#define WL_POSTMASTER_DEATH
Definition: latch.h:131
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:165
uint32 events
Definition: latch.h:155

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1818 of file walsender.c.

1819 {
1820  int wakeEvents;
1821  uint32 wait_event = 0;
1822  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1823 
1824  /*
1825  * Fast path to avoid acquiring the spinlock in case we already know we
1826  * have enough WAL available and all the standby servers have confirmed
1827  * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1828  * if we're far behind.
1829  */
1830  if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
1831  !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1832  return RecentFlushPtr;
1833 
1834  /*
1835  * Within the loop, we wait for the necessary WALs to be flushed to disk
1836  * first, followed by waiting for standbys to catch up if there are enough
1837  * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1838  */
1839  for (;;)
1840  {
1841  bool wait_for_standby_at_stop = false;
1842  long sleeptime;
1843 
1844  /* Clear any already-pending wakeups */
1846 
1848 
1849  /* Process any requests or signals received recently */
1850  if (ConfigReloadPending)
1851  {
1852  ConfigReloadPending = false;
1855  }
1856 
1857  /* Check for input from the client */
1859 
1860  /*
1861  * If we're shutting down, trigger pending WAL to be written out,
1862  * otherwise we'd possibly end up waiting for WAL that never gets
1863  * written, because walwriter has shut down already.
1864  */
1865  if (got_STOPPING)
1867 
1868  /*
1869  * To avoid the scenario where standbys need to catch up to a newer
1870  * WAL location in each iteration, we update our idea of the currently
1871  * flushed position only if we are not waiting for standbys to catch
1872  * up.
1873  */
1874  if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1875  {
1876  if (!RecoveryInProgress())
1877  RecentFlushPtr = GetFlushRecPtr(NULL);
1878  else
1879  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1880  }
1881 
1882  /*
1883  * If postmaster asked us to stop and the standby slots have caught up
1884  * to the flushed position, don't wait anymore.
1885  *
1886  * It's important to do this check after the recomputation of
1887  * RecentFlushPtr, so we can send all remaining data before shutting
1888  * down.
1889  */
1890  if (got_STOPPING)
1891  {
1892  if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1893  wait_for_standby_at_stop = true;
1894  else
1895  break;
1896  }
1897 
1898  /*
1899  * We only send regular messages to the client for full decoded
1900  * transactions, but a synchronous replication and walsender shutdown
1901  * possibly are waiting for a later location. So, before sleeping, we
1902  * send a ping containing the flush location. If the receiver is
1903  * otherwise idle, this keepalive will trigger a reply. Processing the
1904  * reply will update these MyWalSnd locations.
1905  */
1906  if (MyWalSnd->flush < sentPtr &&
1907  MyWalSnd->write < sentPtr &&
1910 
1911  /*
1912  * Exit the loop if already caught up and doesn't need to wait for
1913  * standby slots.
1914  */
1915  if (!wait_for_standby_at_stop &&
1916  !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1917  break;
1918 
1919  /*
1920  * Waiting for new WAL or waiting for standbys to catch up. Since we
1921  * need to wait, we're now caught up.
1922  */
1923  WalSndCaughtUp = true;
1924 
1925  /*
1926  * Try to flush any pending output to the client.
1927  */
1928  if (pq_flush_if_writable() != 0)
1929  WalSndShutdown();
1930 
1931  /*
1932  * If we have received CopyDone from the client, sent CopyDone
1933  * ourselves, and the output buffer is empty, it's time to exit
1934  * streaming, so fail the current WAL fetch request.
1935  */
1937  !pq_is_send_pending())
1938  break;
1939 
1940  /* die if timeout was reached */
1942 
1943  /* Send keepalive if the time has come */
1945 
1946  /*
1947  * Sleep until something happens or we time out. Also wait for the
1948  * socket becoming writable, if there's still pending output.
1949  * Otherwise we might sit on sendable output data while waiting for
1950  * new WAL to be generated. (But if we have nothing to send, we don't
1951  * want to wake on socket-writable.)
1952  */
1954 
1955  wakeEvents = WL_SOCKET_READABLE;
1956 
1957  if (pq_is_send_pending())
1958  wakeEvents |= WL_SOCKET_WRITEABLE;
1959 
1960  Assert(wait_event != 0);
1961 
1962  WalSndWait(wakeEvents, sleeptime, wait_event);
1963  }
1964 
1965  /* reactivate latch so WalSndLoop knows to continue */
1966  SetLatch(MyLatch);
1967  return RecentFlushPtr;
1968 }
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1790
bool XLogBackgroundFlush(void)
Definition: xlog.c:2979

References Assert, CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsInvalid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3771 of file walsender.c.

3772 {
3773  for (;;)
3774  {
3775  int i;
3776  bool all_stopped = true;
3777 
3778  for (i = 0; i < max_wal_senders; i++)
3779  {
3780  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3781 
3782  SpinLockAcquire(&walsnd->mutex);
3783 
3784  if (walsnd->pid == 0)
3785  {
3786  SpinLockRelease(&walsnd->mutex);
3787  continue;
3788  }
3789 
3790  if (walsnd->state != WALSNDSTATE_STOPPING)
3791  {
3792  all_stopped = false;
3793  SpinLockRelease(&walsnd->mutex);
3794  break;
3795  }
3796  SpinLockRelease(&walsnd->mutex);
3797  }
3798 
3799  /* safe to leave if confirmation is done for all WAL senders */
3800  if (all_stopped)
3801  return;
3802 
3803  pg_usleep(10000L); /* wait for 10 msec */
3804  }
3805 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3666 of file walsender.c.

3667 {
3668  /*
3669  * Wake up all the walsenders waiting on WAL being flushed or replayed
3670  * respectively. Note that waiting walsender would have prepared to sleep
3671  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3672  * before actually waiting.
3673  */
3674  if (physical)
3676 
3677  if (logical)
3679 }

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1572 of file walsender.c.

1574 {
1575  TimestampTz now;
1576 
1577  /*
1578  * Fill the send timestamp last, so that it is taken as late as possible.
1579  * This is somewhat ugly, but the protocol is set as it's already used for
1580  * several releases by streaming physical replication.
1581  */
1584  pq_sendint64(&tmpbuf, now);
1585  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1586  tmpbuf.data, sizeof(int64));
1587 
1588  /* output previously gathered data in a CopyData packet */
1589  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1590 
1592 
1593  /* Try to flush pending output to the client */
1594  if (pq_flush_if_writable() != 0)
1595  WalSndShutdown();
1596 
1597  /* Try taking fast path unless we get too close to walsender timeout. */
1599  wal_sender_timeout / 2) &&
1600  !pq_is_send_pending())
1601  {
1602  return;
1603  }
1604 
1605  /* If we have pending write here, go to slow path */
1607 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3376 of file walsender.c.

3377 {
3378  XLogRecord *record;
3379  char *errm;
3380 
3381  /*
3382  * We'll use the current flush point to determine whether we've caught up.
3383  * This variable is static in order to cache it across calls. Caching is
3384  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3385  * spinlock.
3386  */
3387  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3388 
3389  /*
3390  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3391  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3392  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3393  * didn't wait - i.e. when we're shutting down.
3394  */
3395  WalSndCaughtUp = false;
3396 
3397  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3398 
3399  /* xlog record was invalid */
3400  if (errm != NULL)
3401  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3402  errm);
3403 
3404  if (record != NULL)
3405  {
3406  /*
3407  * Note the lack of any call to LagTrackerWrite() which is handled by
3408  * WalSndUpdateProgress which is called by output plugin through
3409  * logical decoding write api.
3410  */
3412 
3414  }
3415 
3416  /*
3417  * If first time through in this session, initialize flushPtr. Otherwise,
3418  * we only need to update flushPtr if EndRecPtr is past it.
3419  */
3420  if (flushPtr == InvalidXLogRecPtr ||
3421  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3422  {
3424  flushPtr = GetStandbyFlushRecPtr(NULL);
3425  else
3426  flushPtr = GetFlushRecPtr(NULL);
3427  }
3428 
3429  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3430  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3431  WalSndCaughtUp = true;
3432 
3433  /*
3434  * If we're caught up and have been requested to stop, have WalSndLoop()
3435  * terminate the connection in an orderly manner, after writing out all
3436  * the pending data.
3437  */
3439  got_SIGUSR2 = true;
3440 
3441  /* Update shared memory status */
3442  {
3443  WalSnd *walsnd = MyWalSnd;
3444 
3445  SpinLockAcquire(&walsnd->mutex);
3446  walsnd->sentPtr = sentPtr;
3447  SpinLockRelease(&walsnd->mutex);
3448  }
3449 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:389

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3066 of file walsender.c.

3067 {
3068  XLogRecPtr SendRqstPtr;
3069  XLogRecPtr startptr;
3070  XLogRecPtr endptr;
3071  Size nbytes;
3072  XLogSegNo segno;
3073  WALReadError errinfo;
3074  Size rbytes;
3075 
3076  /* If requested switch the WAL sender to the stopping state. */
3077  if (got_STOPPING)
3079 
3081  {
3082  WalSndCaughtUp = true;
3083  return;
3084  }
3085 
3086  /* Figure out how far we can safely send the WAL. */
3088  {
3089  /*
3090  * Streaming an old timeline that's in this server's history, but is
3091  * not the one we're currently inserting or replaying. It can be
3092  * streamed up to the point where we switched off that timeline.
3093  */
3094  SendRqstPtr = sendTimeLineValidUpto;
3095  }
3096  else if (am_cascading_walsender)
3097  {
3098  TimeLineID SendRqstTLI;
3099 
3100  /*
3101  * Streaming the latest timeline on a standby.
3102  *
3103  * Attempt to send all WAL that has already been replayed, so that we
3104  * know it's valid. If we're receiving WAL through streaming
3105  * replication, it's also OK to send any WAL that has been received
3106  * but not replayed.
3107  *
3108  * The timeline we're recovering from can change, or we can be
3109  * promoted. In either case, the current timeline becomes historic. We
3110  * need to detect that so that we don't try to stream past the point
3111  * where we switched to another timeline. We check for promotion or
3112  * timeline switch after calculating FlushPtr, to avoid a race
3113  * condition: if the timeline becomes historic just after we checked
3114  * that it was still current, it's still be OK to stream it up to the
3115  * FlushPtr that was calculated before it became historic.
3116  */
3117  bool becameHistoric = false;
3118 
3119  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3120 
3121  if (!RecoveryInProgress())
3122  {
3123  /* We have been promoted. */
3124  SendRqstTLI = GetWALInsertionTimeLine();
3125  am_cascading_walsender = false;
3126  becameHistoric = true;
3127  }
3128  else
3129  {
3130  /*
3131  * Still a cascading standby. But is the timeline we're sending
3132  * still the one recovery is recovering from?
3133  */
3134  if (sendTimeLine != SendRqstTLI)
3135  becameHistoric = true;
3136  }
3137 
3138  if (becameHistoric)
3139  {
3140  /*
3141  * The timeline we were sending has become historic. Read the
3142  * timeline history file of the new timeline to see where exactly
3143  * we forked off from the timeline we were sending.
3144  */
3145  List *history;
3146 
3147  history = readTimeLineHistory(SendRqstTLI);
3149 
3151  list_free_deep(history);
3152 
3153  sendTimeLineIsHistoric = true;
3154 
3155  SendRqstPtr = sendTimeLineValidUpto;
3156  }
3157  }
3158  else
3159  {
3160  /*
3161  * Streaming the current timeline on a primary.
3162  *
3163  * Attempt to send all data that's already been written out and
3164  * fsync'd to disk. We cannot go further than what's been written out
3165  * given the current implementation of WALRead(). And in any case
3166  * it's unsafe to send WAL that is not securely down to disk on the
3167  * primary: if the primary subsequently crashes and restarts, standbys
3168  * must not have applied any WAL that got lost on the primary.
3169  */
3170  SendRqstPtr = GetFlushRecPtr(NULL);
3171  }
3172 
3173  /*
3174  * Record the current system time as an approximation of the time at which
3175  * this WAL location was written for the purposes of lag tracking.
3176  *
3177  * In theory we could make XLogFlush() record a time in shmem whenever WAL
3178  * is flushed and we could get that time as well as the LSN when we call
3179  * GetFlushRecPtr() above (and likewise for the cascading standby
3180  * equivalent), but rather than putting any new code into the hot WAL path
3181  * it seems good enough to capture the time here. We should reach this
3182  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3183  * may take some time, we read the WAL flush pointer and take the time
3184  * very close to together here so that we'll get a later position if it is
3185  * still moving.
3186  *
3187  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3188  * this gives us a cheap approximation for the WAL flush time for this
3189  * LSN.
3190  *
3191  * Note that the LSN is not necessarily the LSN for the data contained in
3192  * the present message; it's the end of the WAL, which might be further
3193  * ahead. All the lag tracking machinery cares about is finding out when
3194  * that arbitrary LSN is eventually reported as written, flushed and
3195  * applied, so that it can measure the elapsed time.
3196  */
3197  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3198 
3199  /*
3200  * If this is a historic timeline and we've reached the point where we
3201  * forked to the next timeline, stop streaming.
3202  *
3203  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3204  * startup process will normally replay all WAL that has been received
3205  * from the primary, before promoting, but if the WAL streaming is
3206  * terminated at a WAL page boundary, the valid portion of the timeline
3207  * might end in the middle of a WAL record. We might've already sent the
3208  * first half of that partial WAL record to the cascading standby, so that
3209  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3210  * replay the partial WAL record either, so it can still follow our
3211  * timeline switch.
3212  */
3214  {
3215  /* close the current file. */
3216  if (xlogreader->seg.ws_file >= 0)
3218 
3219  /* Send CopyDone */
3220  pq_putmessage_noblock('c', NULL, 0);
3221  streamingDoneSending = true;
3222 
3223  WalSndCaughtUp = true;
3224 
3225  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3228  return;
3229  }
3230 
3231  /* Do we have any work to do? */
3232  Assert(sentPtr <= SendRqstPtr);
3233  if (SendRqstPtr <= sentPtr)
3234  {
3235  WalSndCaughtUp = true;
3236  return;
3237  }
3238 
3239  /*
3240  * Figure out how much to send in one message. If there's no more than
3241  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3242  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3243  *
3244  * The rounding is not only for performance reasons. Walreceiver relies on
3245  * the fact that we never split a WAL record across two messages. Since a
3246  * long WAL record is split at page boundary into continuation records,
3247  * page boundary is always a safe cut-off point. We also assume that
3248  * SendRqstPtr never points to the middle of a WAL record.
3249  */
3250  startptr = sentPtr;
3251  endptr = startptr;
3252  endptr += MAX_SEND_SIZE;
3253 
3254  /* if we went beyond SendRqstPtr, back off */
3255  if (SendRqstPtr <= endptr)
3256  {
3257  endptr = SendRqstPtr;
3259  WalSndCaughtUp = false;
3260  else
3261  WalSndCaughtUp = true;
3262  }
3263  else
3264  {
3265  /* round down to page boundary. */
3266  endptr -= (endptr % XLOG_BLCKSZ);
3267  WalSndCaughtUp = false;
3268  }
3269 
3270  nbytes = endptr - startptr;
3271  Assert(nbytes <= MAX_SEND_SIZE);
3272 
3273  /*
3274  * OK to read and send the slice.
3275  */
3277  pq_sendbyte(&output_message, 'w');
3278 
3279  pq_sendint64(&output_message, startptr); /* dataStart */
3280  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3281  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3282 
3283  /*
3284  * Read the log directly into the output buffer to avoid extra memcpy
3285  * calls.
3286  */
3288 
3289 retry:
3290  /* attempt to read WAL from WAL buffers first */
3292  startptr, nbytes, xlogreader->seg.ws_tli);
3293  output_message.len += rbytes;
3294  startptr += rbytes;
3295  nbytes -= rbytes;
3296 
3297  /* now read the remaining WAL from WAL file */
3298  if (nbytes > 0 &&
3301  startptr,
3302  nbytes,
3303  xlogreader->seg.ws_tli, /* Pass the current TLI because
3304  * only WalSndSegmentOpen controls
3305  * whether new TLI is needed. */
3306  &errinfo))
3307  WALReadRaiseError(&errinfo);
3308 
3309  /* See logical_read_xlog_page(). */
3310  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3312 
3313  /*
3314  * During recovery, the currently-open WAL file might be replaced with the
3315  * file of the same name retrieved from archive. So we always need to
3316  * check what we read was valid after reading into the buffer. If it's
3317  * invalid, we try to open and read the file again.
3318  */
3320  {
3321  WalSnd *walsnd = MyWalSnd;
3322  bool reload;
3323 
3324  SpinLockAcquire(&walsnd->mutex);
3325  reload = walsnd->needreload;
3326  walsnd->needreload = false;
3327  SpinLockRelease(&walsnd->mutex);
3328 
3329  if (reload && xlogreader->seg.ws_file >= 0)
3330  {
3332 
3333  goto retry;
3334  }
3335  }