PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 208 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 107 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 226 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1047 of file walsender.c.

1048 {
1049  const char *snapshot_name = NULL;
1050  char xloc[MAXFNAMELEN];
1051  char *slot_name;
1052  bool reserve_wal = false;
1053  bool two_phase = false;
1054  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1055  DestReceiver *dest;
1056  TupOutputState *tstate;
1057  TupleDesc tupdesc;
1058  Datum values[4];
1059  bool nulls[4];
1060 
1062 
1063  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1064 
1065  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1066  {
1067  ReplicationSlotCreate(cmd->slotname, false,
1069  false);
1070  }
1071  else
1072  {
1074 
1075  /*
1076  * Initially create persistent slot as ephemeral - that allows us to
1077  * nicely handle errors during initialization because it'll get
1078  * dropped if this transaction fails. We'll make it persistent at the
1079  * end. Temporary slots can be created as temporary from beginning as
1080  * they get dropped on error as well.
1081  */
1082  ReplicationSlotCreate(cmd->slotname, true,
1084  two_phase);
1085  }
1086 
1087  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1088  {
1090  bool need_full_snapshot = false;
1091 
1092  /*
1093  * Do options check early so that we can bail before calling the
1094  * DecodingContextFindStartpoint which can take long time.
1095  */
1096  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1097  {
1098  if (IsTransactionBlock())
1099  ereport(ERROR,
1100  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1101  (errmsg("%s must not be called inside a transaction",
1102  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1103 
1104  need_full_snapshot = true;
1105  }
1106  else if (snapshot_action == CRS_USE_SNAPSHOT)
1107  {
1108  if (!IsTransactionBlock())
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called inside a transaction",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113 
1115  ereport(ERROR,
1116  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1117  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1118  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1119 
1120  if (FirstSnapshotSet)
1121  ereport(ERROR,
1122  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1123  (errmsg("%s must be called before any query",
1124  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1125 
1126  if (IsSubTransaction())
1127  ereport(ERROR,
1128  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1129  (errmsg("%s must not be called in a subtransaction",
1130  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1131 
1132  need_full_snapshot = true;
1133  }
1134 
1135  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1137  XL_ROUTINE(.page_read = logical_read_xlog_page,
1138  .segment_open = WalSndSegmentOpen,
1139  .segment_close = wal_segment_close),
1142 
1143  /*
1144  * Signal that we don't need the timeout mechanism. We're just
1145  * creating the replication slot and don't yet accept feedback
1146  * messages or send keepalives. As we possibly need to wait for
1147  * further WAL the walsender would otherwise possibly be killed too
1148  * soon.
1149  */
1151 
1152  /* build initial snapshot, might take a while */
1154 
1155  /*
1156  * Export or use the snapshot if we've been asked to do so.
1157  *
1158  * NB. We will convert the snapbuild.c kind of snapshot to normal
1159  * snapshot when doing this.
1160  */
1161  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1162  {
1163  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1164  }
1165  else if (snapshot_action == CRS_USE_SNAPSHOT)
1166  {
1167  Snapshot snap;
1168 
1171  }
1172 
1173  /* don't need the decoding context anymore */
1174  FreeDecodingContext(ctx);
1175 
1176  if (!cmd->temporary)
1178  }
1179  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1180  {
1182 
1184 
1185  /* Write this slot to disk if it's a permanent one. */
1186  if (!cmd->temporary)
1188  }
1189 
1190  snprintf(xloc, sizeof(xloc), "%X/%X",
1192 
1194  MemSet(nulls, false, sizeof(nulls));
1195 
1196  /*----------
1197  * Need a tuple descriptor representing four columns:
1198  * - first field: the slot name
1199  * - second field: LSN at which we became consistent
1200  * - third field: exported snapshot's name
1201  * - fourth field: output plugin
1202  *----------
1203  */
1204  tupdesc = CreateTemplateTupleDesc(4);
1205  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1206  TEXTOID, -1, 0);
1207  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1208  TEXTOID, -1, 0);
1209  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1210  TEXTOID, -1, 0);
1211  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1212  TEXTOID, -1, 0);
1213 
1214  /* prepare for projection of tuples */
1215  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1216 
1217  /* slot_name */
1218  slot_name = NameStr(MyReplicationSlot->data.name);
1219  values[0] = CStringGetTextDatum(slot_name);
1220 
1221  /* consistent wal location */
1222  values[1] = CStringGetTextDatum(xloc);
1223 
1224  /* snapshot name, or NULL if none */
1225  if (snapshot_name != NULL)
1226  values[2] = CStringGetTextDatum(snapshot_name);
1227  else
1228  nulls[2] = true;
1229 
1230  /* plugin, or NULL if none */
1231  if (cmd->plugin != NULL)
1232  values[3] = CStringGetTextDatum(cmd->plugin);
1233  else
1234  nulls[3] = true;
1235 
1236  /* send it to dest */
1237  do_tup_output(tstate, values, nulls);
1238  end_tup_output(tstate);
1239 
1241 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
#define NameStr(name)
Definition: c.h:681
#define MemSet(start, val, len)
Definition: c.h:1008
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:93
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
Assert(fmt[strlen(fmt) - 1] !='\n')
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:634
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:590
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:319
#define NIL
Definition: pg_list.h:65
static bool two_phase
#define snprintf
Definition: port.h:225
uintptr_t Datum
Definition: postgres.h:411
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1151
void ReplicationSlotPersist(void)
Definition: slot.c:815
ReplicationSlot * MyReplicationSlot
Definition: slot.c:97
void ReplicationSlotSave(void)
Definition: slot.c:780
void ReplicationSlotRelease(void)
Definition: slot.c:522
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:255
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
bool FirstSnapshotSet
Definition: snapmgr.c:149
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2269
PGPROC * MyProc
Definition: proc.c:68
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:84
ReplicationSlotPersistentData data
Definition: slot.h:147
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2660
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:980
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1384
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1480
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:919
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1357
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
int XactIsoLevel
Definition: xact.c:78
bool IsSubTransaction(void)
Definition: xact.c:4839
bool IsTransactionBlock(void)
Definition: xact.c:4766
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:856

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1247 of file walsender.c.

1248 {
1249  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1250 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:625

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1682 of file walsender.c.

1683 {
1684  int parse_rc;
1685  Node *cmd_node;
1686  const char *cmdtag;
1687  MemoryContext cmd_context;
1688  MemoryContext old_context;
1689 
1690  /*
1691  * If WAL sender has been told that shutdown is getting close, switch its
1692  * status accordingly to handle the next replication commands correctly.
1693  */
1694  if (got_STOPPING)
1696 
1697  /*
1698  * Throw error if in stopping mode. We need prevent commands that could
1699  * generate WAL while the shutdown checkpoint is being written. To be
1700  * safe, we just prohibit all new commands.
1701  */
1703  ereport(ERROR,
1704  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1705  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1706 
1707  /*
1708  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1709  * command arrives. Clean up the old stuff if there's anything.
1710  */
1712 
1714 
1715  /*
1716  * Prepare to parse and execute the command.
1717  */
1719  "Replication command context",
1721  old_context = MemoryContextSwitchTo(cmd_context);
1722 
1723  replication_scanner_init(cmd_string);
1724 
1725  /*
1726  * Is it a WalSender command?
1727  */
1729  {
1730  /* Nope; clean up and get out. */
1732 
1733  MemoryContextSwitchTo(old_context);
1734  MemoryContextDelete(cmd_context);
1735 
1736  /* XXX this is a pretty random place to make this check */
1737  if (MyDatabaseId == InvalidOid)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1740  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1741 
1742  /* Tell the caller that this wasn't a WalSender command. */
1743  return false;
1744  }
1745 
1746  /*
1747  * Looks like a WalSender command, so parse it.
1748  */
1749  parse_rc = replication_yyparse();
1750  if (parse_rc != 0)
1751  ereport(ERROR,
1752  (errcode(ERRCODE_SYNTAX_ERROR),
1753  errmsg_internal("replication command parser returned %d",
1754  parse_rc)));
1756 
1757  cmd_node = replication_parse_result;
1758 
1759  /*
1760  * Report query to various monitoring facilities. For this purpose, we
1761  * report replication commands just like SQL commands.
1762  */
1763  debug_query_string = cmd_string;
1764 
1766 
1767  /*
1768  * Log replication command if log_replication_commands is enabled. Even
1769  * when it's disabled, log the command with DEBUG1 level for backward
1770  * compatibility.
1771  */
1773  (errmsg("received replication command: %s", cmd_string)));
1774 
1775  /*
1776  * Disallow replication commands in aborted transaction blocks.
1777  */
1779  ereport(ERROR,
1780  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1781  errmsg("current transaction is aborted, "
1782  "commands ignored until end of transaction block")));
1783 
1785 
1786  /*
1787  * Allocate buffers that will be used for each outgoing and incoming
1788  * message. We do this just once per command to reduce palloc overhead.
1789  */
1793 
1794  switch (cmd_node->type)
1795  {
1796  case T_IdentifySystemCmd:
1797  cmdtag = "IDENTIFY_SYSTEM";
1798  set_ps_display(cmdtag);
1799  IdentifySystem();
1800  EndReplicationCommand(cmdtag);
1801  break;
1802 
1804  cmdtag = "READ_REPLICATION_SLOT";
1805  set_ps_display(cmdtag);
1807  EndReplicationCommand(cmdtag);
1808  break;
1809 
1810  case T_BaseBackupCmd:
1811  cmdtag = "BASE_BACKUP";
1812  set_ps_display(cmdtag);
1813  PreventInTransactionBlock(true, cmdtag);
1814  SendBaseBackup((BaseBackupCmd *) cmd_node);
1815  EndReplicationCommand(cmdtag);
1816  break;
1817 
1819  cmdtag = "CREATE_REPLICATION_SLOT";
1820  set_ps_display(cmdtag);
1822  EndReplicationCommand(cmdtag);
1823  break;
1824 
1826  cmdtag = "DROP_REPLICATION_SLOT";
1827  set_ps_display(cmdtag);
1829  EndReplicationCommand(cmdtag);
1830  break;
1831 
1832  case T_StartReplicationCmd:
1833  {
1834  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1835 
1836  cmdtag = "START_REPLICATION";
1837  set_ps_display(cmdtag);
1838  PreventInTransactionBlock(true, cmdtag);
1839 
1840  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1841  StartReplication(cmd);
1842  else
1844 
1845  /* dupe, but necessary per libpqrcv_endstreaming */
1846  EndReplicationCommand(cmdtag);
1847 
1848  Assert(xlogreader != NULL);
1849  break;
1850  }
1851 
1852  case T_TimeLineHistoryCmd:
1853  cmdtag = "TIMELINE_HISTORY";
1854  set_ps_display(cmdtag);
1855  PreventInTransactionBlock(true, cmdtag);
1857  EndReplicationCommand(cmdtag);
1858  break;
1859 
1860  case T_VariableShowStmt:
1861  {
1863  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1864 
1865  cmdtag = "SHOW";
1866  set_ps_display(cmdtag);
1867 
1868  /* syscache access needs a transaction environment */
1870  GetPGVariable(n->name, dest);
1872  EndReplicationCommand(cmdtag);
1873  }
1874  break;
1875 
1876  default:
1877  elog(ERROR, "unrecognized replication command node tag: %u",
1878  cmd_node->type);
1879  }
1880 
1881  /* done */
1882  MemoryContextSwitchTo(old_context);
1883  MemoryContextDelete(cmd_context);
1884 
1885  /*
1886  * We need not update ps display or pg_stat_activity, because PostgresMain
1887  * will reset those to "idle". But we must reset debug_query_string to
1888  * ensure it doesn't become a dangling pointer.
1889  */
1890  debug_query_string = NULL;
1891 
1892  return true;
1893 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:948
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
int errcode(int sqlerrcode)
Definition: elog.c:693
#define LOG
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:24
#define elog(elevel,...)
Definition: elog.h:218
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9666
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
@ T_IdentifySystemCmd
Definition: nodes.h:531
@ T_BaseBackupCmd
Definition: nodes.h:532
@ T_VariableShowStmt
Definition: nodes.h:376
@ T_ReadReplicationSlotCmd
Definition: nodes.h:535
@ T_DropReplicationSlotCmd
Definition: nodes.h:534
@ T_TimeLineHistoryCmd
Definition: nodes.h:537
@ T_StartReplicationCmd
Definition: nodes.h:536
@ T_CreateReplicationSlotCmd
Definition: nodes.h:533
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:36
void set_ps_display(const char *activity)
Definition: ps_status.c:349
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:574
NodeTag type
Definition: nodes.h:575
ReplicationKind kind
Definition: replnodes.h:82
WalSndState state
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:580
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:479
static StringInfoData tmpbuf
Definition: walsender.c:160
static void IdentifySystem(void)
Definition: walsender.c:395
static StringInfoData reply_message
Definition: walsender.c:159
void WalSndSetState(WalSndState state)
Definition: walsender.c:3418
static StringInfoData output_message
Definition: walsender.c:158
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
bool log_replication_commands
Definition: walsender.c:126
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1047
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1257
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1247
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:686
static XLogReaderState * xlogreader
Definition: walsender.c:138
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_init(const char *query_string)
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3462
void StartTransactionCommand(void)
Definition: xact.c:2925
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:394
void CommitTransactionCommand(void)
Definition: xact.c:3022

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, tmpbuf, Node::type, ReadReplicationSlotCmd::type, WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)
static

Definition at line 3159 of file walsender.c.

3160 {
3161  XLogRecPtr replayPtr;
3162  TimeLineID replayTLI;
3163  XLogRecPtr receivePtr;
3165  XLogRecPtr result;
3166 
3167  /*
3168  * We can safely send what's already been replayed. Also, if walreceiver
3169  * is streaming WAL from the same timeline, we can send anything that it
3170  * has streamed, but hasn't been replayed yet.
3171  */
3172 
3173  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3174  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3175 
3176  *tli = replayTLI;
3177 
3178  result = replayPtr;
3179  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3180  result = receivePtr;
3181 
3182  return result;
3183 }
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:260
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3212 of file walsender.c.

3213 {
3215 
3216  /*
3217  * If replication has not yet started, die like with SIGTERM. If
3218  * replication is active, only set a flag and wake up the main loop. It
3219  * will send any outstanding WAL, wait for it to be replicated to the
3220  * standby, and then exit gracefully.
3221  */
3222  if (!replication_active)
3223  kill(MyProcPid, SIGTERM);
3224  else
3225  got_STOPPING = true;
3226 }
int MyProcPid
Definition: globals.c:44
bool am_walsender
Definition: walsender.c:116
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define kill(pid, sig)
Definition: win32_port.h:464

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 395 of file walsender.c.

396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4];
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440  MemSet(nulls, false, sizeof(nulls));
441 
442  /* need a tuple descriptor representing four columns */
443  tupdesc = CreateTemplateTupleDesc(4);
444  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
445  TEXTOID, -1, 0);
446  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
447  INT4OID, -1, 0);
448  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
449  TEXTOID, -1, 0);
450  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
451  TEXTOID, -1, 0);
452 
453  /* prepare for projection of tuples */
454  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
455 
456  /* column 1: system identifier */
457  values[0] = CStringGetTextDatum(sysid);
458 
459  /* column 2: timeline */
460  values[1] = Int32GetDatum(currTLI);
461 
462  /* column 3: wal location */
463  values[2] = CStringGetTextDatum(xloc);
464 
465  /* column 4: database name, or NULL if none */
466  if (dbname)
468  else
469  nulls[3] = true;
470 
471  /* send it to dest */
472  do_tup_output(tstate, values, nulls);
473 
474  end_tup_output(tstate);
475 }
#define UINT64_FORMAT
Definition: c.h:484
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2994
struct cursor * cur
Definition: ecpg.c:28
#define Int32GetDatum(X)
Definition: postgres.h:523
char * dbname
Definition: streamutil.c:51
bool am_cascading_walsender
Definition: walsender.c:117
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3159
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4206
bool RecoveryInProgress(void)
Definition: xlog.c:5753
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:5918

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2582 of file walsender.c.

2583 {
2584  int i;
2585 
2586  /*
2587  * WalSndCtl should be set up already (we inherit this by fork() or
2588  * EXEC_BACKEND mechanism from the postmaster).
2589  */
2590  Assert(WalSndCtl != NULL);
2591  Assert(MyWalSnd == NULL);
2592 
2593  /*
2594  * Find a free walsender slot and reserve it. This must not fail due to
2595  * the prior check for free WAL senders in InitProcess().
2596  */
2597  for (i = 0; i < max_wal_senders; i++)
2598  {
2599  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2600 
2601  SpinLockAcquire(&walsnd->mutex);
2602 
2603  if (walsnd->pid != 0)
2604  {
2605  SpinLockRelease(&walsnd->mutex);
2606  continue;
2607  }
2608  else
2609  {
2610  /*
2611  * Found a free slot. Reserve it for us.
2612  */
2613  walsnd->pid = MyProcPid;
2614  walsnd->state = WALSNDSTATE_STARTUP;
2615  walsnd->sentPtr = InvalidXLogRecPtr;
2616  walsnd->needreload = false;
2617  walsnd->write = InvalidXLogRecPtr;
2618  walsnd->flush = InvalidXLogRecPtr;
2619  walsnd->apply = InvalidXLogRecPtr;
2620  walsnd->writeLag = -1;
2621  walsnd->flushLag = -1;
2622  walsnd->applyLag = -1;
2623  walsnd->sync_standby_priority = 0;
2624  walsnd->latch = &MyProc->procLatch;
2625  walsnd->replyTime = 0;
2626  SpinLockRelease(&walsnd->mutex);
2627  /* don't need the lock anymore */
2628  MyWalSnd = (WalSnd *) walsnd;
2629 
2630  break;
2631  }
2632  }
2633 
2634  Assert(MyWalSnd != NULL);
2635 
2636  /* Arrange to clean up at walsender exit */
2638 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch procLatch
Definition: proc.h:168
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:122
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2642
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3770 of file walsender.c.

3771 {
3772  TimestampTz time = 0;
3773 
3774  /* Read all unread samples up to this LSN or end of buffer. */
3775  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3776  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3777  {
3778  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3779  lag_tracker->last_read[head] =
3781  lag_tracker->read_heads[head] =
3783  }
3784 
3785  /*
3786  * If the lag tracker is empty, that means the standby has processed
3787  * everything we've ever sent so we should now clear 'last_read'. If we
3788  * didn't do that, we'd risk using a stale and irrelevant sample for
3789  * interpolation at the beginning of the next burst of WAL after a period
3790  * of idleness.
3791  */
3793  lag_tracker->last_read[head].time = 0;
3794 
3795  if (time > now)
3796  {
3797  /* If the clock somehow went backwards, treat as not found. */
3798  return -1;
3799  }
3800  else if (time == 0)
3801  {
3802  /*
3803  * We didn't cross a time. If there is a future sample that we
3804  * haven't reached yet, and we've already reached at least one sample,
3805  * let's interpolate the local flushed time. This is mainly useful
3806  * for reporting a completely stuck apply position as having
3807  * increasing lag, since otherwise we'd have to wait for it to
3808  * eventually start moving again and cross one of our samples before
3809  * we can show the lag increasing.
3810  */
3812  {
3813  /* There are no future samples, so we can't interpolate. */
3814  return -1;
3815  }
3816  else if (lag_tracker->last_read[head].time != 0)
3817  {
3818  /* We can interpolate between last_read and the next sample. */
3819  double fraction;
3820  WalTimeSample prev = lag_tracker->last_read[head];
3822 
3823  if (lsn < prev.lsn)
3824  {
3825  /*
3826  * Reported LSNs shouldn't normally go backwards, but it's
3827  * possible when there is a timeline change. Treat as not
3828  * found.
3829  */
3830  return -1;
3831  }
3832 
3833  Assert(prev.lsn < next.lsn);
3834 
3835  if (prev.time > next.time)
3836  {
3837  /* If the clock somehow went backwards, treat as not found. */
3838  return -1;
3839  }
3840 
3841  /* See how far we are between the previous and next samples. */
3842  fraction =
3843  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3844 
3845  /* Scale the local flush time proportionally. */
3846  time = (TimestampTz)
3847  ((double) prev.time + (next.time - prev.time) * fraction);
3848  }
3849  else
3850  {
3851  /*
3852  * We have only a future sample, implying that we were entirely
3853  * caught up but and now there is a new burst of WAL and the
3854  * standby hasn't processed the first sample yet. Until the
3855  * standby reaches the future sample the best we can do is report
3856  * the hypothetical lag if that sample were to be replayed now.
3857  */
3858  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3859  }
3860  }
3861 
3862  /* Return the elapsed time since local flush time in microseconds. */
3863  Assert(time != 0);
3864  return now - time;
3865 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
static int32 next
Definition: blutils.c:219
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:220
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3705 of file walsender.c.

3706 {
3707  bool buffer_full;
3708  int new_write_head;
3709  int i;
3710 
3711  if (!am_walsender)
3712  return;
3713 
3714  /*
3715  * If the lsn hasn't advanced since last time, then do nothing. This way
3716  * we only record a new sample when new WAL has been written.
3717  */
3718  if (lag_tracker->last_lsn == lsn)
3719  return;
3720  lag_tracker->last_lsn = lsn;
3721 
3722  /*
3723  * If advancing the write head of the circular buffer would crash into any
3724  * of the read heads, then the buffer is full. In other words, the
3725  * slowest reader (presumably apply) is the one that controls the release
3726  * of space.
3727  */
3728  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3729  buffer_full = false;
3730  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3731  {
3732  if (new_write_head == lag_tracker->read_heads[i])
3733  buffer_full = true;
3734  }
3735 
3736  /*
3737  * If the buffer is full, for now we just rewind by one slot and overwrite
3738  * the last sample, as a simple (if somewhat uneven) way to lower the
3739  * sampling rate. There may be better adaptive compaction algorithms.
3740  */
3741  if (buffer_full)
3742  {
3743  new_write_head = lag_tracker->write_head;
3744  if (lag_tracker->write_head > 0)
3746  else
3748  }
3749 
3750  /* Store a sample at the current write head position. */
3752  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3753  lag_tracker->write_head = new_write_head;
3754 }
XLogRecPtr last_lsn
Definition: walsender.c:213
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 919 of file walsender.c.

921 {
922  XLogRecPtr flushptr;
923  int count;
924  WALReadError errinfo;
925  XLogSegNo segno;
927 
928  /*
929  * Since logical decoding is only permitted on a primary server, we know
930  * that the current timeline ID can't be changing any more. If we did this
931  * on a standby, we'd have to worry about the values we compute here
932  * becoming invalid due to a promotion or timeline change.
933  */
934  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
935  sendTimeLineIsHistoric = (state->currTLI != currTLI);
936  sendTimeLine = state->currTLI;
937  sendTimeLineValidUpto = state->currTLIValidUntil;
938  sendTimeLineNextTLI = state->nextTLI;
939 
940  /* make sure we have enough WAL available */
941  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
942 
943  /* fail if not (implies we are going to shut down) */
944  if (flushptr < targetPagePtr + reqLen)
945  return -1;
946 
947  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
948  count = XLOG_BLCKSZ; /* more than one block available */
949  else
950  count = flushptr - targetPagePtr; /* part of the page available */
951 
952  /* now actually read the data, we know it's there */
953  if (!WALRead(state,
954  cur_page,
955  targetPagePtr,
956  XLOG_BLCKSZ,
957  state->seg.ws_tli, /* Pass the current TLI because only
958  * WalSndSegmentOpen controls whether new
959  * TLI is needed. */
960  &errinfo))
961  WALReadRaiseError(&errinfo);
962 
963  /*
964  * After reading into the buffer, check that what we read was valid. We do
965  * this after reading, because even though the segment was present when we
966  * opened it, it might get recycled or removed while we read it. The
967  * read() succeeds in that case, but the data we tried to read might
968  * already have been overwritten with new WAL records.
969  */
970  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
971  CheckXLogRemoved(segno, state->seg.ws_tli);
972 
973  return count;
974 }
Definition: regguts.h:318
static TimeLineID sendTimeLine
Definition: walsender.c:146
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1548
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:5941
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3454
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1460
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:732
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1039

References CheckXLogRemoved(), GetWALInsertionTimeLine(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3456 of file walsender.c.

3457 {
3458  Interval *result = palloc(sizeof(Interval));
3459 
3460  result->month = 0;
3461  result->day = 0;
3462  result->time = offset;
3463 
3464  return result;
3465 }
void * palloc(Size size)
Definition: mcxt.c:1068
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase 
)
static

Definition at line 980 of file walsender.c.

984 {
985  ListCell *lc;
986  bool snapshot_action_given = false;
987  bool reserve_wal_given = false;
988  bool two_phase_given = false;
989 
990  /* Parse options */
991  foreach(lc, cmd->options)
992  {
993  DefElem *defel = (DefElem *) lfirst(lc);
994 
995  if (strcmp(defel->defname, "snapshot") == 0)
996  {
997  char *action;
998 
999  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1000  ereport(ERROR,
1001  (errcode(ERRCODE_SYNTAX_ERROR),
1002  errmsg("conflicting or redundant options")));
1003 
1004  action = defGetString(defel);
1005  snapshot_action_given = true;
1006 
1007  if (strcmp(action, "export") == 0)
1008  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1009  else if (strcmp(action, "nothing") == 0)
1010  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1011  else if (strcmp(action, "use") == 0)
1012  *snapshot_action = CRS_USE_SNAPSHOT;
1013  else
1014  ereport(ERROR,
1015  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1016  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1017  defel->defname, action)));
1018  }
1019  else if (strcmp(defel->defname, "reserve_wal") == 0)
1020  {
1021  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1022  ereport(ERROR,
1023  (errcode(ERRCODE_SYNTAX_ERROR),
1024  errmsg("conflicting or redundant options")));
1025 
1026  reserve_wal_given = true;
1027  *reserve_wal = defGetBoolean(defel);
1028  }
1029  else if (strcmp(defel->defname, "two_phase") == 0)
1030  {
1031  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1032  ereport(ERROR,
1033  (errcode(ERRCODE_SYNTAX_ERROR),
1034  errmsg("conflicting or redundant options")));
1035  two_phase_given = true;
1036  *two_phase = defGetBoolean(defel);
1037  }
1038  else
1039  elog(ERROR, "unrecognized option: %s", defel->defname);
1040  }
1041 }
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
#define lfirst(lc)
Definition: pg_list.h:169
char * defname
Definition: parsenodes.h:765
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3472 of file walsender.c.

3473 {
3474 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3475  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3476  SyncRepStandbyData *sync_standbys;
3477  int num_standbys;
3478  int i;
3479 
3480  SetSingleFuncCall(fcinfo, 0);
3481 
3482  /*
3483  * Get the currently active synchronous standbys. This could be out of
3484  * date before we're done, but we'll use the data anyway.
3485  */
3486  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3487 
3488  for (i = 0; i < max_wal_senders; i++)
3489  {
3490  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3492  XLogRecPtr write;
3493  XLogRecPtr flush;
3494  XLogRecPtr apply;
3495  TimeOffset writeLag;
3496  TimeOffset flushLag;
3497  TimeOffset applyLag;
3498  int priority;
3499  int pid;
3501  TimestampTz replyTime;
3502  bool is_sync_standby;
3504  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3505  int j;
3506 
3507  /* Collect data from shared memory */
3508  SpinLockAcquire(&walsnd->mutex);
3509  if (walsnd->pid == 0)
3510  {
3511  SpinLockRelease(&walsnd->mutex);
3512  continue;
3513  }
3514  pid = walsnd->pid;
3515  sentPtr = walsnd->sentPtr;
3516  state = walsnd->state;
3517  write = walsnd->write;
3518  flush = walsnd->flush;
3519  apply = walsnd->apply;
3520  writeLag = walsnd->writeLag;
3521  flushLag = walsnd->flushLag;
3522  applyLag = walsnd->applyLag;
3523  priority = walsnd->sync_standby_priority;
3524  replyTime = walsnd->replyTime;
3525  SpinLockRelease(&walsnd->mutex);
3526 
3527  /*
3528  * Detect whether walsender is/was considered synchronous. We can
3529  * provide some protection against stale data by checking the PID
3530  * along with walsnd_index.
3531  */
3532  is_sync_standby = false;
3533  for (j = 0; j < num_standbys; j++)
3534  {
3535  if (sync_standbys[j].walsnd_index == i &&
3536  sync_standbys[j].pid == pid)
3537  {
3538  is_sync_standby = true;
3539  break;
3540  }
3541  }
3542 
3543  memset(nulls, 0, sizeof(nulls));
3544  values[0] = Int32GetDatum(pid);
3545 
3546  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3547  {
3548  /*
3549  * Only superusers and roles with privileges of pg_read_all_stats
3550  * can see details. Other users only get the pid value to know
3551  * it's a walsender, but no details.
3552  */
3553  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3554  }
3555  else
3556  {
3558 
3560  nulls[2] = true;
3561  values[2] = LSNGetDatum(sentPtr);
3562 
3564  nulls[3] = true;
3565  values[3] = LSNGetDatum(write);
3566 
3567  if (XLogRecPtrIsInvalid(flush))
3568  nulls[4] = true;
3569  values[4] = LSNGetDatum(flush);
3570 
3571  if (XLogRecPtrIsInvalid(apply))
3572  nulls[5] = true;
3573  values[5] = LSNGetDatum(apply);
3574 
3575  /*
3576  * Treat a standby such as a pg_basebackup background process
3577  * which always returns an invalid flush location, as an
3578  * asynchronous standby.
3579  */
3580  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3581 
3582  if (writeLag < 0)
3583  nulls[6] = true;
3584  else
3586 
3587  if (flushLag < 0)
3588  nulls[7] = true;
3589  else
3591 
3592  if (applyLag < 0)
3593  nulls[8] = true;
3594  else
3596 
3597  values[9] = Int32GetDatum(priority);
3598 
3599  /*
3600  * More easily understood version of standby state. This is purely
3601  * informational.
3602  *
3603  * In quorum-based sync replication, the role of each standby
3604  * listed in synchronous_standby_names can be changing very
3605  * frequently. Any standbys considered as "sync" at one moment can
3606  * be switched to "potential" ones at the next moment. So, it's
3607  * basically useless to report "sync" or "potential" as their sync
3608  * states. We report just "quorum" for them.
3609  */
3610  if (priority == 0)
3611  values[10] = CStringGetTextDatum("async");
3612  else if (is_sync_standby)
3614  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3615  else
3616  values[10] = CStringGetTextDatum("potential");
3617 
3618  if (replyTime == 0)
3619  nulls[11] = true;
3620  else
3621  values[11] = TimestampTzGetDatum(replyTime);
3622  }
3623 
3624  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3625  values, nulls);
3626  }
3627 
3628  return (Datum) 0;
3629 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4951
int64 TimeOffset
Definition: timestamp.h:40
void SetSingleFuncCall(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:492
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
TupleDesc setDesc
Definition: execnodes.h:317
Tuplestorestate * setResult
Definition: execnodes.h:316
uint8 syncrep_method
Definition: syncrep.h:69
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
static XLogRecPtr sentPtr
Definition: walsender.c:155
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3456
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3437
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, Int32GetDatum, IntervalPGetDatum, j, LSNGetDatum, max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, sentPtr, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SetSingleFuncCall(), SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2044 of file walsender.c.

2045 {
2046  bool changed = false;
2048 
2049  Assert(lsn != InvalidXLogRecPtr);
2050  SpinLockAcquire(&slot->mutex);
2051  if (slot->data.restart_lsn != lsn)
2052  {
2053  changed = true;
2054  slot->data.restart_lsn = lsn;
2055  }
2056  SpinLockRelease(&slot->mutex);
2057 
2058  if (changed)
2059  {
2062  }
2063 
2064  /*
2065  * One could argue that the slot should be saved to disk now, but that'd
2066  * be energy wasted - the worst lost information can do here is give us
2067  * wrong information in a statistics view - we'll just potentially be more
2068  * conservative in removing files.
2069  */
2070 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:887
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:120

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2181 of file walsender.c.

2182 {
2183  bool changed = false;
2185 
2186  SpinLockAcquire(&slot->mutex);
2188 
2189  /*
2190  * For physical replication we don't need the interlock provided by xmin
2191  * and effective_xmin since the consequences of a missed increase are
2192  * limited to query cancellations, so set both at once.
2193  */
2194  if (!TransactionIdIsNormal(slot->data.xmin) ||
2195  !TransactionIdIsNormal(feedbackXmin) ||
2196  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2197  {
2198  changed = true;
2199  slot->data.xmin = feedbackXmin;
2200  slot->effective_xmin = feedbackXmin;
2201  }
2202  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2203  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2204  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2205  {
2206  changed = true;
2207  slot->data.catalog_xmin = feedbackCatalogXmin;
2208  slot->effective_catalog_xmin = feedbackCatalogXmin;
2209  }
2210  SpinLockRelease(&slot->mutex);
2211 
2212  if (changed)
2213  {
2216  }
2217 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
TransactionId xmin
Definition: proc.h:176
TransactionId xmin
Definition: slot.h:62
TransactionId catalog_xmin
Definition: slot.h:70
TransactionId effective_catalog_xmin
Definition: slot.h:144
TransactionId effective_xmin
Definition: slot.h:143
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1426 of file walsender.c.

1427 {
1428  for (;;)
1429  {
1430  long sleeptime;
1431 
1432  /* Check for input from the client */
1434 
1435  /* die if timeout was reached */
1437 
1438  /* Send keepalive if the time has come */
1440 
1441  if (!pq_is_send_pending())
1442  break;
1443 
1445 
1446  /* Sleep until something happens or we time out */
1449 
1450  /* Clear any already-pending wakeups */
1452 
1454 
1455  /* Process any requests or signals received recently */
1456  if (ConfigReloadPending)
1457  {
1458  ConfigReloadPending = false;
1461  }
1462 
1463  /* Try to flush pending output to the client */
1464  if (pq_flush_if_writable() != 0)
1465  WalSndShutdown();
1466  }
1467 
1468  /* reactivate latch so WalSndLoop knows to continue */
1469  SetLatch(MyLatch);
1470 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:72
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:566
void ResetLatch(Latch *latch)
Definition: latch.c:658
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:411
@ WAIT_EVENT_WAL_SENDER_WRITE_DATA
Definition: wait_event.h:69
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3337
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2429
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1900
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3667
static void WalSndShutdown(void)
Definition: walsender.c:230
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2385

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1900 of file walsender.c.

1901 {
1902  unsigned char firstchar;
1903  int maxmsglen;
1904  int r;
1905  bool received = false;
1906 
1908 
1909  /*
1910  * If we already received a CopyDone from the frontend, any subsequent
1911  * message is the beginning of a new command, and should be processed in
1912  * the main processing loop.
1913  */
1914  while (!streamingDoneReceiving)
1915  {
1916  pq_startmsgread();
1917  r = pq_getbyte_if_available(&firstchar);
1918  if (r < 0)
1919  {
1920  /* unexpected error or EOF */
1922  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1923  errmsg("unexpected EOF on standby connection")));
1924  proc_exit(0);
1925  }
1926  if (r == 0)
1927  {
1928  /* no data available without blocking */
1929  pq_endmsgread();
1930  break;
1931  }
1932 
1933  /* Validate message type and set packet size limit */
1934  switch (firstchar)
1935  {
1936  case 'd':
1937  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1938  break;
1939  case 'c':
1940  case 'X':
1941  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1942  break;
1943  default:
1944  ereport(FATAL,
1945  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1946  errmsg("invalid standby message type \"%c\"",
1947  firstchar)));
1948  maxmsglen = 0; /* keep compiler quiet */
1949  break;
1950  }
1951 
1952  /* Read the message contents */
1954  if (pq_getmessage(&reply_message, maxmsglen))
1955  {
1957  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1958  errmsg("unexpected EOF on standby connection")));
1959  proc_exit(0);
1960  }
1961 
1962  /* ... and process it */
1963  switch (firstchar)
1964  {
1965  /*
1966  * 'd' means a standby reply wrapped in a CopyData packet.
1967  */
1968  case 'd':
1970  received = true;
1971  break;
1972 
1973  /*
1974  * CopyDone means the standby requested to finish streaming.
1975  * Reply with CopyDone, if we had not sent that already.
1976  */
1977  case 'c':
1978  if (!streamingDoneSending)
1979  {
1980  pq_putmessage_noblock('c', NULL, 0);
1981  streamingDoneSending = true;
1982  }
1983 
1984  streamingDoneReceiving = true;
1985  received = true;
1986  break;
1987 
1988  /*
1989  * 'X' means that the standby is closing down the socket.
1990  */
1991  case 'X':
1992  proc_exit(0);
1993 
1994  default:
1995  Assert(false); /* NOT REACHED */
1996  }
1997  }
1998 
1999  /*
2000  * Save the last reply timestamp if we've received at least one reply.
2001  */
2002  if (received)
2003  {
2005  waiting_for_ping_response = false;
2006  }
2007 }
#define COMMERROR
Definition: elog.h:27
#define FATAL
Definition: elog.h:35
void proc_exit(int code)
Definition: ipc.c:104
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1226
void pq_endmsgread(void)
Definition: pqcomm.c:1188
void pq_startmsgread(void)
Definition: pqcomm.c:1164
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_processing
Definition: walsender.c:163
static bool streamingDoneSending
Definition: walsender.c:180
static void ProcessStandbyMessage(void)
Definition: walsender.c:2013
static bool streamingDoneReceiving
Definition: walsender.c:181

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2261 of file walsender.c.

2262 {
2263  TransactionId feedbackXmin;
2264  uint32 feedbackEpoch;
2265  TransactionId feedbackCatalogXmin;
2266  uint32 feedbackCatalogEpoch;
2267  TimestampTz replyTime;
2268 
2269  /*
2270  * Decipher the reply message. The caller already consumed the msgtype
2271  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2272  * of this message.
2273  */
2274  replyTime = pq_getmsgint64(&reply_message);
2275  feedbackXmin = pq_getmsgint(&reply_message, 4);
2276  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2277  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2278  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2279 
2281  {
2282  char *replyTimeStr;
2283 
2284  /* Copy because timestamptz_to_str returns a static buffer */
2285  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2286 
2287  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2288  feedbackXmin,
2289  feedbackEpoch,
2290  feedbackCatalogXmin,
2291  feedbackCatalogEpoch,
2292  replyTimeStr);
2293 
2294  pfree(replyTimeStr);
2295  }
2296 
2297  /*
2298  * Update shared state for this WalSender process based on reply data from
2299  * standby.
2300  */
2301  {
2302  WalSnd *walsnd = MyWalSnd;
2303 
2304  SpinLockAcquire(&walsnd->mutex);
2305  walsnd->replyTime = replyTime;
2306  SpinLockRelease(&walsnd->mutex);
2307  }
2308 
2309  /*
2310  * Unset WalSender's xmins if the feedback message values are invalid.
2311  * This happens when the downstream turned hot_standby_feedback off.
2312  */
2313  if (!TransactionIdIsNormal(feedbackXmin)
2314  && !TransactionIdIsNormal(feedbackCatalogXmin))
2315  {
2317  if (MyReplicationSlot != NULL)
2318  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2319  return;
2320  }
2321 
2322  /*
2323  * Check that the provided xmin/epoch are sane, that is, not in the future
2324  * and not so far back as to be already wrapped around. Ignore if not.
2325  */
2326  if (TransactionIdIsNormal(feedbackXmin) &&
2327  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2328  return;
2329 
2330  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2331  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2332  return;
2333 
2334  /*
2335  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2336  * the xmin will be taken into account by GetSnapshotData() /
2337  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2338  * thereby prevent the generation of cleanup conflicts on the standby
2339  * server.
2340  *
2341  * There is a small window for a race condition here: although we just
2342  * checked that feedbackXmin precedes nextXid, the nextXid could have
2343  * gotten advanced between our fetching it and applying the xmin below,
2344  * perhaps far enough to make feedbackXmin wrap around. In that case the
2345  * xmin we set here would be "in the future" and have no effect. No point
2346  * in worrying about this since it's too late to save the desired data
2347  * anyway. Assuming that the standby sends us an increasing sequence of
2348  * xmins, this could only happen during the first reply cycle, else our
2349  * own xmin would prevent nextXid from advancing so far.
2350  *
2351  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2352  * is assumed atomic, and there's no real need to prevent concurrent
2353  * horizon determinations. (If we're moving our xmin forward, this is
2354  * obviously safe, and if we're moving it backwards, well, the data is at
2355  * risk already since a VACUUM could already have determined the horizon.)
2356  *
2357  * If we're using a replication slot we reserve the xmin via that,
2358  * otherwise via the walsender's PGPROC entry. We can only track the
2359  * catalog xmin separately when using a slot, so we store the least of the
2360  * two provided when not using a slot.
2361  *
2362  * XXX: It might make sense to generalize the ephemeral slot concept and
2363  * always use the slot mechanism to handle the feedback xmin.
2364  */
2365  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2366  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2367  else
2368  {
2369  if (TransactionIdIsNormal(feedbackCatalogXmin)
2370  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2371  MyProc->xmin = feedbackCatalogXmin;
2372  else
2373  MyProc->xmin = feedbackXmin;
2374  }
2375 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1768
unsigned int uint32
Definition: c.h:441
uint32 TransactionId
Definition: c.h:587
bool message_level_is_interesting(int elevel)
Definition: elog.c:265
#define DEBUG2
Definition: elog.h:23
char * pstrdup(const char *in)
Definition: mcxt.c:1305
void pfree(void *pointer)
Definition: mcxt.c:1175
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2181
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2230

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2013 of file walsender.c.

2014 {
2015  char msgtype;
2016 
2017  /*
2018  * Check message type from the first byte.
2019  */
2020  msgtype = pq_getmsgbyte(&reply_message);
2021 
2022  switch (msgtype)
2023  {
2024  case 'r':
2026  break;
2027 
2028  case 'h':
2030  break;
2031 
2032  default:
2034  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2035  errmsg("unexpected message type \"%c\"", msgtype)));
2036  proc_exit(0);
2037  }
2038 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2261
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2076

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2076 of file walsender.c.

2077 {
2078  XLogRecPtr writePtr,
2079  flushPtr,
2080  applyPtr;
2081  bool replyRequested;
2082  TimeOffset writeLag,
2083  flushLag,
2084  applyLag;
2085  bool clearLagTimes;
2086  TimestampTz now;
2087  TimestampTz replyTime;
2088 
2089  static bool fullyAppliedLastTime = false;
2090 
2091  /* the caller already consumed the msgtype byte */
2092  writePtr = pq_getmsgint64(&reply_message);
2093  flushPtr = pq_getmsgint64(&reply_message);
2094  applyPtr = pq_getmsgint64(&reply_message);
2095  replyTime = pq_getmsgint64(&reply_message);
2096  replyRequested = pq_getmsgbyte(&reply_message);
2097 
2099  {
2100  char *replyTimeStr;
2101 
2102  /* Copy because timestamptz_to_str returns a static buffer */
2103  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2104 
2105  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2106  LSN_FORMAT_ARGS(writePtr),
2107  LSN_FORMAT_ARGS(flushPtr),
2108  LSN_FORMAT_ARGS(applyPtr),
2109  replyRequested ? " (reply requested)" : "",
2110  replyTimeStr);
2111 
2112  pfree(replyTimeStr);
2113  }
2114 
2115  /* See if we can compute the round-trip lag for these positions. */
2117  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2118  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2119  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2120 
2121  /*
2122  * If the standby reports that it has fully replayed the WAL in two
2123  * consecutive reply messages, then the second such message must result
2124  * from wal_receiver_status_interval expiring on the standby. This is a
2125  * convenient time to forget the lag times measured when it last
2126  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2127  * until more WAL traffic arrives.
2128  */
2129  clearLagTimes = false;
2130  if (applyPtr == sentPtr)
2131  {
2132  if (fullyAppliedLastTime)
2133  clearLagTimes = true;
2134  fullyAppliedLastTime = true;
2135  }
2136  else
2137  fullyAppliedLastTime = false;
2138 
2139  /* Send a reply if the standby requested one. */
2140  if (replyRequested)
2142 
2143  /*
2144  * Update shared state for this WalSender process based on reply data from
2145  * standby.
2146  */
2147  {
2148  WalSnd *walsnd = MyWalSnd;
2149 
2150  SpinLockAcquire(&walsnd->mutex);
2151  walsnd->write = writePtr;
2152  walsnd->flush = flushPtr;
2153  walsnd->apply = applyPtr;
2154  if (writeLag != -1 || clearLagTimes)
2155  walsnd->writeLag = writeLag;
2156  if (flushLag != -1 || clearLagTimes)
2157  walsnd->flushLag = flushLag;
2158  if (applyLag != -1 || clearLagTimes)
2159  walsnd->applyLag = applyLag;
2160  walsnd->replyTime = replyTime;
2161  SpinLockRelease(&walsnd->mutex);
2162  }
2163 
2166 
2167  /*
2168  * Advance our local xmin horizon when the client confirmed a flush.
2169  */
2170  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2171  {
2174  else
2176  }
2177 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1736
#define SlotIsLogical(slot)
Definition: slot.h:169
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2044
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3644
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3770

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 479 of file walsender.c.

480 {
481 #define READ_REPLICATION_SLOT_COLS 3
482  ReplicationSlot *slot;
484  TupOutputState *tstate;
485  TupleDesc tupdesc;
487  bool nulls[READ_REPLICATION_SLOT_COLS];
488 
490  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
491  TEXTOID, -1, 0);
492  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
493  TEXTOID, -1, 0);
494  /* TimeLineID is unsigned, so int4 is not wide enough. */
495  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
496  INT8OID, -1, 0);
497 
499  MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
500 
501  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
502  slot = SearchNamedReplicationSlot(cmd->slotname, false);
503  if (slot == NULL || !slot->in_use)
504  {
505  LWLockRelease(ReplicationSlotControlLock);
506  }
507  else
508  {
509  ReplicationSlot slot_contents;
510  int i = 0;
511 
512  /* Copy slot contents while holding spinlock */
513  SpinLockAcquire(&slot->mutex);
514  slot_contents = *slot;
515  SpinLockRelease(&slot->mutex);
516  LWLockRelease(ReplicationSlotControlLock);
517 
518  if (OidIsValid(slot_contents.data.database))
519  ereport(ERROR,
520  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
521  errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
522  "READ_REPLICATION_SLOT",
523  NameStr(slot_contents.data.name)));
524 
525  /* slot type */
526  values[i] = CStringGetTextDatum("physical");
527  nulls[i] = false;
528  i++;
529 
530  /* start LSN */
531  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
532  {
533  char xloc[64];
534 
535  snprintf(xloc, sizeof(xloc), "%X/%X",
536  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
537  values[i] = CStringGetTextDatum(xloc);
538  nulls[i] = false;
539  }
540  i++;
541 
542  /* timeline this WAL was produced on */
543  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
544  {
545  TimeLineID slots_position_timeline;
546  TimeLineID current_timeline;
547  List *timeline_history = NIL;
548 
549  /*
550  * While in recovery, use as timeline the currently-replaying one
551  * to get the LSN position's history.
552  */
553  if (RecoveryInProgress())
554  (void) GetXLogReplayRecPtr(&current_timeline);
555  else
556  current_timeline = GetWALInsertionTimeLine();
557 
558  timeline_history = readTimeLineHistory(current_timeline);
559  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
560  timeline_history);
561  values[i] = Int64GetDatum((int64) slots_position_timeline);
562  nulls[i] = false;
563  }
564  i++;
565 
567  }
568 
570  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
571  do_tup_output(tstate, values, nulls);
572  end_tup_output(tstate);
573 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:552
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:710
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
@ LW_SHARED
Definition: lwlock.h:105
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:377
Definition: pg_list.h:51
bool in_use
Definition: slot.h:123
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemSet, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 580 of file walsender.c.

581 {
583  char histfname[MAXFNAMELEN];
584  char path[MAXPGPATH];
585  int fd;
586  off_t histfilelen;
587  off_t bytesleft;
588  Size len;
589 
590  /*
591  * Reply with a result set with one row, and two columns. The first col is
592  * the name of the history file, 2nd is the contents.
593  */
594 
595  TLHistoryFileName(histfname, cmd->timeline);
596  TLHistoryFilePath(path, cmd->timeline);
597 
598  /* Send a RowDescription message */
599  pq_beginmessage(&buf, 'T');
600  pq_sendint16(&buf, 2); /* 2 fields */
601 
602  /* first field */
603  pq_sendstring(&buf, "filename"); /* col name */
604  pq_sendint32(&buf, 0); /* table oid */
605  pq_sendint16(&buf, 0); /* attnum */
606  pq_sendint32(&buf, TEXTOID); /* type oid */
607  pq_sendint16(&buf, -1); /* typlen */
608  pq_sendint32(&buf, 0); /* typmod */
609  pq_sendint16(&buf, 0); /* format code */
610 
611  /* second field */
612  pq_sendstring(&buf, "content"); /* col name */
613  pq_sendint32(&buf, 0); /* table oid */
614  pq_sendint16(&buf, 0); /* attnum */
615  pq_sendint32(&buf, TEXTOID); /* type oid */
616  pq_sendint16(&buf, -1); /* typlen */
617  pq_sendint32(&buf, 0); /* typmod */
618  pq_sendint16(&buf, 0); /* format code */
619  pq_endmessage(&buf);
620 
621  /* Send a DataRow message */
622  pq_beginmessage(&buf, 'D');
623  pq_sendint16(&buf, 2); /* # of columns */
624  len = strlen(histfname);
625  pq_sendint32(&buf, len); /* col1 len */
626  pq_sendbytes(&buf, histfname, len);
627 
628  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
629  if (fd < 0)
630  ereport(ERROR,
632  errmsg("could not open file \"%s\": %m", path)));
633 
634  /* Determine file length and send it to client */
635  histfilelen = lseek(fd, 0, SEEK_END);
636  if (histfilelen < 0)
637  ereport(ERROR,
639  errmsg("could not seek to end of file \"%s\": %m", path)));
640  if (lseek(fd, 0, SEEK_SET) != 0)
641  ereport(ERROR,
643  errmsg("could not seek to beginning of file \"%s\": %m", path)));
644 
645  pq_sendint32(&buf, histfilelen); /* col2 len */
646 
647  bytesleft = histfilelen;
648  while (bytesleft > 0)
649  {
650  PGAlignedBlock rbuf;
651  int nread;
652 
654  nread = read(fd, rbuf.data, sizeof(rbuf));
656  if (nread < 0)
657  ereport(ERROR,
659  errmsg("could not read file \"%s\": %m",
660  path)));
661  else if (nread == 0)
662  ereport(ERROR,
664  errmsg("could not read file \"%s\": read %d of %zu",
665  path, nread, (Size) bytesleft)));
666 
667  pq_sendbytes(&buf, rbuf.data, nread);
668  bytesleft -= nread;
669  }
670 
671  if (CloseTransientFile(fd) != 0)
672  ereport(ERROR,
674  errmsg("could not close file \"%s\": %m", path)));
675 
676  pq_endmessage(&buf);
677 }
#define PG_BINARY
Definition: c.h:1268
size_t Size
Definition: c.h:540
int errcode_for_file_access(void)
Definition: elog.c:716
int CloseTransientFile(int fd)
Definition: fd.c:2688
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2511
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:43
#define MAXPGPATH
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
TimeLineID timeline
Definition: replnodes.h:108
char data[BLCKSZ]
Definition: c.h:1138
@ WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ
Definition: wait_event.h:221
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:266
static void pgstat_report_wait_end(void)
Definition: wait_event.h:282
#define TLHistoryFileName(fname, tli)
#define TLHistoryFilePath(path, tli)

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1257 of file walsender.c.

1258 {
1260  QueryCompletion qc;
1261 
1262  /* make sure that our requirements are still fulfilled */
1264 
1266 
1267  ReplicationSlotAcquire(cmd->slotname, true);
1268 
1270  ereport(ERROR,
1271  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1272  errmsg("cannot read from logical replication slot \"%s\"",
1273  cmd->slotname),
1274  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1275 
1276  /*
1277  * Force a disconnect, so that the decoding code doesn't need to care
1278  * about an eventual switch from running in recovery, to running in a
1279  * normal environment. Client code is expected to handle reconnects.
1280  */
1282  {
1283  ereport(LOG,
1284  (errmsg("terminating walsender process after promotion")));
1285  got_STOPPING = true;
1286  }
1287 
1288  /*
1289  * Create our decoding context, making it start at the previously ack'ed
1290  * position.
1291  *
1292  * Do this before sending a CopyBothResponse message, so that any errors
1293  * are reported early.
1294  */
1296  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1297  XL_ROUTINE(.page_read = logical_read_xlog_page,
1298  .segment_open = WalSndSegmentOpen,
1299  .segment_close = wal_segment_close),
1303 
1305 
1306  /* Send a CopyBothResponse message, and start streaming */
1307  pq_beginmessage(&buf, 'W');
1308  pq_sendbyte(&buf, 0);
1309  pq_sendint16(&buf, 0);
1310  pq_endmessage(&buf);
1311  pq_flush();
1312 
1313  /* Start reading WAL from the oldest required WAL. */
1316 
1317  /*
1318  * Report the location after which we'll send out further commits as the
1319  * current sentPtr.
1320  */
1322 
1323  /* Also update the sent position status in shared memory */
1327 
1328  replication_active = true;
1329 
1331 
1332  /* Main loop of walsender */
1334 
1337 
1338  replication_active = false;
1339  if (got_STOPPING)
1340  proc_exit(0);
1342 
1343  /* Get out of COPY mode (CommandComplete). */
1344  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1345  EndCommand(&qc, DestRemote, false);
1346 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
@ DestRemote
Definition: dest.h:91
int errdetail(const char *fmt,...)
Definition: elog.c:1037
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:425
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:85
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2456
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3038
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 686 of file walsender.c.

687 {
689  XLogRecPtr FlushPtr;
690  TimeLineID FlushTLI;
691 
692  /* create xlogreader for physical replication */
693  xlogreader =
695  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
696  .segment_close = wal_segment_close),
697  NULL);
698 
699  if (!xlogreader)
700  ereport(ERROR,
701  (errcode(ERRCODE_OUT_OF_MEMORY),
702  errmsg("out of memory"),
703  errdetail("Failed while allocating a WAL reading processor.")));
704 
705  /*
706  * We assume here that we're logging enough information in the WAL for
707  * log-shipping, since this is checked in PostmasterMain().
708  *
709  * NOTE: wal_level can only change at shutdown, so in most cases it is
710  * difficult for there to be WAL data that we can still see that was
711  * written at wal_level='minimal'.
712  */
713 
714  if (cmd->slotname)
715  {
716  ReplicationSlotAcquire(cmd->slotname, true);
718  ereport(ERROR,
719  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
720  errmsg("cannot use a logical replication slot for physical replication")));
721 
722  /*
723  * We don't need to verify the slot's restart_lsn here; instead we
724  * rely on the caller requesting the starting point to use. If the
725  * WAL segment doesn't exist, we'll fail later.
726  */
727  }
728 
729  /*
730  * Select the timeline. If it was given explicitly by the client, use
731  * that. Otherwise use the timeline of the last replayed record.
732  */
735  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
736  else
737  FlushPtr = GetFlushRecPtr(&FlushTLI);
738 
739  if (cmd->timeline != 0)
740  {
741  XLogRecPtr switchpoint;
742 
743  sendTimeLine = cmd->timeline;
744  if (sendTimeLine == FlushTLI)
745  {
746  sendTimeLineIsHistoric = false;
748  }
749  else
750  {
751  List *timeLineHistory;
752 
753  sendTimeLineIsHistoric = true;
754 
755  /*
756  * Check that the timeline the client requested exists, and the
757  * requested start location is on that timeline.
758  */
759  timeLineHistory = readTimeLineHistory(FlushTLI);
760  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
762  list_free_deep(timeLineHistory);
763 
764  /*
765  * Found the requested timeline in the history. Check that
766  * requested startpoint is on that timeline in our history.
767  *
768  * This is quite loose on purpose. We only check that we didn't
769  * fork off the requested timeline before the switchpoint. We
770  * don't check that we switched *to* it before the requested
771  * starting point. This is because the client can legitimately
772  * request to start replication from the beginning of the WAL
773  * segment that contains switchpoint, but on the new timeline, so
774  * that it doesn't end up with a partial segment. If you ask for
775  * too old a starting point, you'll get an error later when we
776  * fail to find the requested WAL segment in pg_wal.
777  *
778  * XXX: we could be more strict here and only allow a startpoint
779  * that's older than the switchpoint, if it's still in the same
780  * WAL segment.
781  */
782  if (!XLogRecPtrIsInvalid(switchpoint) &&
783  switchpoint < cmd->startpoint)
784  {
785  ereport(ERROR,
786  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
788  cmd->timeline),
789  errdetail("This server's history forked from timeline %u at %X/%X.",
790  cmd->timeline,
791  LSN_FORMAT_ARGS(switchpoint))));
792  }
793  sendTimeLineValidUpto = switchpoint;
794  }
795  }
796  else
797  {
798  sendTimeLine = FlushTLI;
800  sendTimeLineIsHistoric = false;
801  }
802 
804 
805  /* If there is nothing to stream, don't even enter COPY mode */
807  {
808  /*
809  * When we first start replication the standby will be behind the
810  * primary. For some applications, for example synchronous
811  * replication, it is important to have a clear state for this initial
812  * catchup mode, so we can trigger actions when we change streaming
813  * state later. We may stay in this state for a long time, which is
814  * exactly why we want to be able to monitor whether or not we are
815  * still here.
816  */
818 
819  /* Send a CopyBothResponse message, and start streaming */
820  pq_beginmessage(&buf, 'W');
821  pq_sendbyte(&buf, 0);
822  pq_sendint16(&buf, 0);
823  pq_endmessage(&buf);
824  pq_flush();
825 
826  /*
827  * Don't allow a request to stream from a future point in WAL that
828  * hasn't been flushed to disk in this server yet.
829  */
830  if (FlushPtr < cmd->startpoint)
831  {
832  ereport(ERROR,
833  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
835  LSN_FORMAT_ARGS(FlushPtr))));
836  }
837 
838  /* Start streaming from the requested point */
839  sentPtr = cmd->startpoint;
840 
841  /* Initialize shared memory status, too */
845 
847 
848  /* Main loop of walsender */
849  replication_active = true;
850 
852 
853  replication_active = false;
854  if (got_STOPPING)
855  proc_exit(0);
857 
859  }
860 
861  if (cmd->slotname)
863 
864  /*
865  * Copy is finished now. Send a single-row result set indicating the next
866  * timeline.
867  */
869  {
870  char startpos_str[8 + 1 + 8 + 1];
872  TupOutputState *tstate;
873  TupleDesc tupdesc;
874  Datum values[2];
875  bool nulls[2];
876 
877  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
879 
881  MemSet(nulls, false, sizeof(nulls));
882 
883  /*
884  * Need a tuple descriptor representing two columns. int8 may seem
885  * like a surprising data type for this, but in theory int4 would not
886  * be wide enough for this, as TimeLineID is unsigned.
887  */
888  tupdesc = CreateTemplateTupleDesc(2);
889  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
890  INT8OID, -1, 0);
891  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
892  TEXTOID, -1, 0);
893 
894  /* prepare for projection of tuple */
895  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
896 
898  values[1] = CStringGetTextDatum(startpos_str);
899 
900  /* send it to dest */
901  do_tup_output(tstate, values, nulls);
902 
903  end_tup_output(tstate);
904  }
905 
906  /* Send CommandComplete message */
907  EndReplicationCommand("START_STREAMING");
908 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
void list_free_deep(List *list)
Definition: list.c:1519
TimeLineID timeline
Definition: replnodes.h:84
static void XLogSendPhysical(void)
Definition: walsender.c:2738
int wal_segment_size
Definition: xlog.c:144
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, MemSet, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2230 of file walsender.c.

2231 {
2232  FullTransactionId nextFullXid;
2233  TransactionId nextXid;
2234  uint32 nextEpoch;
2235 
2236  nextFullXid = ReadNextFullTransactionId();
2237  nextXid = XidFromFullTransactionId(nextFullXid);
2238  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2239 
2240  if (xid <= nextXid)
2241  {
2242  if (epoch != nextEpoch)
2243  return false;
2244  }
2245  else
2246  {
2247  if (epoch + 1 != nextEpoch)
2248  return false;
2249  }
2250 
2251  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2252  return false; /* epoch OK, but it's wrapped around */
2253 
2254  return true;
2255 }
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2429 of file walsender.c.

2430 {
2431  TimestampTz timeout;
2432 
2433  /* don't bail out if we're doing something that doesn't require timeouts */
2434  if (last_reply_timestamp <= 0)
2435  return;
2436 
2439 
2440  if (wal_sender_timeout > 0 && last_processing >= timeout)
2441  {
2442  /*
2443  * Since typically expiration of replication timeout means
2444  * communication problem, we don't send the error message to the
2445  * standby.
2446  */
2448  (errmsg("terminating walsender process due to replication timeout")));
2449 
2450  WalSndShutdown();
2451  }
2452 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int wal_sender_timeout
Definition: walsender.c:124

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2385 of file walsender.c.

2386 {
2387  long sleeptime = 10000; /* 10 s */
2388 
2390  {
2391  TimestampTz wakeup_time;
2392 
2393  /*
2394  * At the latest stop sleeping once wal_sender_timeout has been
2395  * reached.
2396  */
2399 
2400  /*
2401  * If no ping has been sent yet, wakeup when it's time to do so.
2402  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2403  * the timeout passed without a response.
2404  */
2407  wal_sender_timeout / 2);
2408 
2409  /* Compute relative time until wakeup. */
2410  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2411  }
2412 
2413  return sleeptime;
2414 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1687

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3119 of file walsender.c.

3120 {
3121  XLogRecPtr replicatedPtr;
3122 
3123  /* ... let's just be real sure we're caught up ... */
3124  send_data();
3125 
3126  /*
3127  * To figure out whether all WAL has successfully been replicated, check
3128  * flush location if valid, write otherwise. Tools like pg_receivewal will
3129  * usually (unless in synchronous mode) return an invalid flush location.
3130  */
3131  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3133 
3134  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3135  !pq_is_send_pending())
3136  {
3137  QueryCompletion qc;
3138 
3139  /* Inform the standby that XLOG streaming is done */
3140  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3141  EndCommand(&qc, DestRemote, false);
3142  pq_flush();
3143 
3144  proc_exit(0);
3145  }
3148 }
static bool WalSndCaughtUp
Definition: walsender.c:184

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 315 of file walsender.c.

316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
void ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1899
void ReplicationSlotCleanup(void)
Definition: slot.c:578
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4784

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3437 of file walsender.c.

3438 {
3439  switch (state)
3440  {
3441  case WALSNDSTATE_STARTUP:
3442  return "startup";
3443  case WALSNDSTATE_BACKUP:
3444  return "backup";
3445  case WALSNDSTATE_CATCHUP:
3446  return "catchup";
3447  case WALSNDSTATE_STREAMING:
3448  return "streaming";
3449  case WALSNDSTATE_STOPPING:
3450  return "stopping";
3451  }
3452  return "UNKNOWN";
3453 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3354 of file walsender.c.

3355 {
3356  int i;
3357 
3358  for (i = 0; i < max_wal_senders; i++)
3359  {
3360  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3361  pid_t pid;
3362 
3363  SpinLockAcquire(&walsnd->mutex);
3364  pid = walsnd->pid;
3365  SpinLockRelease(&walsnd->mutex);
3366 
3367  if (pid == 0)
3368  continue;
3369 
3371  }
3372 }
#define InvalidBackendId
Definition: backendid.h:23
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 3644 of file walsender.c.

3645 {
3646  elog(DEBUG2, "sending replication keepalive");
3647 
3648  /* construct the message... */
3650  pq_sendbyte(&output_message, 'k');
3651  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3653  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3654 
3655  /* ... and send it wrapped in CopyData */
3657 
3658  /* Set local flag */
3659  if (requestReply)
3661 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3667 of file walsender.c.

3668 {
3669  TimestampTz ping_time;
3670 
3671  /*
3672  * Don't send keepalive messages if timeouts are globally disabled or
3673  * we're doing something not partaking in timeouts.
3674  */
3675  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3676  return;
3677 
3679  return;
3680 
3681  /*
3682  * If half of wal_sender_timeout has lapsed without receiving any reply
3683  * from the standby, send a keep-alive message to the standby requesting
3684  * an immediate reply.
3685  */
3687  wal_sender_timeout / 2);
3688  if (last_processing >= ping_time)
3689  {
3691 
3692  /* Try to flush pending output to the client */
3693  if (pq_flush_if_writable() != 0)
3694  WalSndShutdown();
3695  }
3696 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2642 of file walsender.c.

2643 {
2644  WalSnd *walsnd = MyWalSnd;
2645 
2646  Assert(walsnd != NULL);
2647 
2648  MyWalSnd = NULL;
2649 
2650  SpinLockAcquire(&walsnd->mutex);
2651  /* clear latch while holding the spinlock, so it can safely be read */
2652  walsnd->latch = NULL;
2653  /* Mark WalSnd struct as no longer being in use. */
2654  walsnd->pid = 0;
2655  SpinLockRelease(&walsnd->mutex);
2656 }

References Assert(), WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3234 of file walsender.c.

3235 {
3236  int save_errno = errno;
3237 
3238  got_SIGUSR2 = true;
3239  SetLatch(MyLatch);
3240 
3241  errno = save_errno;
3242 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2456 of file walsender.c.

2457 {
2458  /*
2459  * Initialize the last reply timestamp. That enables timeout processing
2460  * from hereon.
2461  */
2463  waiting_for_ping_response = false;
2464 
2465  /*
2466  * Loop until we reach the end of this timeline or the client requests to
2467  * stop streaming.
2468  */
2469  for (;;)
2470  {
2471  /* Clear any already-pending wakeups */
2473 
2475 
2476  /* Process any requests or signals received recently */
2477  if (ConfigReloadPending)
2478  {
2479  ConfigReloadPending = false;
2482  }
2483 
2484  /* Check for input from the client */
2486 
2487  /*
2488  * If we have received CopyDone from the client, sent CopyDone
2489  * ourselves, and the output buffer is empty, it's time to exit
2490  * streaming.
2491  */
2493  !pq_is_send_pending())
2494  break;
2495 
2496  /*
2497  * If we don't have any pending data in the output buffer, try to send
2498  * some more. If there is some, we don't bother to call send_data
2499  * again until we've flushed it ... but we'd better assume we are not
2500  * caught up.
2501  */
2502  if (!pq_is_send_pending())
2503  send_data();
2504  else
2505  WalSndCaughtUp = false;
2506 
2507  /* Try to flush pending output to the client */
2508  if (pq_flush_if_writable() != 0)
2509  WalSndShutdown();
2510 
2511  /* If nothing remains to be sent right now ... */
2513  {
2514  /*
2515  * If we're in catchup state, move to streaming. This is an
2516  * important state change for users to know about, since before
2517  * this point data loss might occur if the primary dies and we
2518  * need to failover to the standby. The state change is also
2519  * important for synchronous replication, since commits that
2520  * started to wait at that point might wait for some time.
2521  */
2523  {
2524  ereport(DEBUG1,
2525  (errmsg_internal("\"%s\" has now caught up with upstream server",
2526  application_name)));
2528  }
2529 
2530  /*
2531  * When SIGUSR2 arrives, we send any outstanding logs up to the
2532  * shutdown checkpoint record (i.e., the latest record), wait for
2533  * them to be replicated to the standby, and exit. This may be a
2534  * normal termination at shutdown, or a promotion, the walsender
2535  * is not sure which.
2536  */
2537  if (got_SIGUSR2)
2538  WalSndDone(send_data);
2539  }
2540 
2541  /* Check for replication timeout. */
2543 
2544  /* Send keepalive if the time has come */
2546 
2547  /*
2548  * Block if we have unsent data. XXX For logical replication, let
2549  * WalSndWaitForWal() handle any other blocking; idle receivers need
2550  * its additional actions. For physical replication, also block if
2551  * caught up; its send_data does not block.
2552  */
2553  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2556  {
2557  long sleeptime;
2558  int wakeEvents;
2559 
2561  wakeEvents = WL_SOCKET_READABLE;
2562  else
2563  wakeEvents = 0;
2564 
2565  /*
2566  * Use fresh timestamp, not last_processing, to reduce the chance
2567  * of reaching wal_sender_timeout before sending a keepalive.
2568  */
2570 
2571  if (pq_is_send_pending())
2572  wakeEvents |= WL_SOCKET_WRITEABLE;
2573 
2574  /* Sleep until something happens or we time out */
2575  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2576  }
2577  }
2578 }
char * application_name
Definition: guc.c:661
@ WAIT_EVENT_WAL_SENDER_MAIN
Definition: wait_event.h:48
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3119

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1357 of file walsender.c.

1358 {
1359  /* can't have sync rep confused by sending the same LSN several times */
1360  if (!last_write)
1361  lsn = InvalidXLogRecPtr;
1362 
1363  resetStringInfo(ctx->out);
1364 
1365  pq_sendbyte(ctx->out, 'w');
1366  pq_sendint64(ctx->out, lsn); /* dataStart */
1367  pq_sendint64(ctx->out, lsn); /* walEnd */
1368 
1369  /*
1370  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1371  * reserve space here.
1372  */
1373  pq_sendint64(ctx->out, 0); /* sendtime */
1374 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 350 of file walsender.c.

351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3189 of file walsender.c.

3190 {
3191  int i;
3192 
3193  for (i = 0; i < max_wal_senders; i++)
3194  {
3195  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3196 
3197  SpinLockAcquire(&walsnd->mutex);
3198  if (walsnd->pid == 0)
3199  {
3200  SpinLockRelease(&walsnd->mutex);
3201  continue;
3202  }
3203  walsnd->needreload = true;
3204  SpinLockRelease(&walsnd->mutex);
3205  }
3206 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2660 of file walsender.c.

2662 {
2663  char path[MAXPGPATH];
2664 
2665  /*-------
2666  * When reading from a historic timeline, and there is a timeline switch
2667  * within this segment, read from the WAL segment belonging to the new
2668  * timeline.
2669  *
2670  * For example, imagine that this server is currently on timeline 5, and
2671  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2672  * 0/13002088. In pg_wal, we have these files:
2673  *
2674  * ...
2675  * 000000040000000000000012
2676  * 000000040000000000000013
2677  * 000000050000000000000013
2678  * 000000050000000000000014
2679  * ...
2680  *
2681  * In this situation, when requested to send the WAL from segment 0x13, on
2682  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2683  * recovery prefers files from newer timelines, so if the segment was
2684  * restored from the archive on this server, the file belonging to the old
2685  * timeline, 000000040000000000000013, might not exist. Their contents are
2686  * equal up to the switchpoint, because at a timeline switch, the used
2687  * portion of the old segment is copied to the new file. -------
2688  */
2689  *tli_p = sendTimeLine;
2691  {
2692  XLogSegNo endSegNo;
2693 
2694  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2695  if (nextSegNo == endSegNo)
2696  *tli_p = sendTimeLineNextTLI;
2697  }
2698 
2699  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2700  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2701  if (state->seg.ws_file >= 0)
2702  return;
2703 
2704  /*
2705  * If the file is not found, assume it's because the standby asked for a
2706  * too old WAL segment that has already been removed or recycled.
2707  */
2708  if (errno == ENOENT)
2709  {
2710  char xlogfname[MAXFNAMELEN];
2711  int save_errno = errno;
2712 
2713  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2714  errno = save_errno;
2715  ereport(ERROR,
2717  errmsg("requested WAL segment %s has already been removed",
2718  xlogfname)));
2719  }
2720  else
2721  ereport(ERROR,
2723  errmsg("could not open file \"%s\": %m",
2724  path)));
2725 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1071
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3418 of file walsender.c.

3419 {
3420  WalSnd *walsnd = MyWalSnd;
3421 
3423 
3424  if (walsnd->state == state)
3425  return;
3426 
3427  SpinLockAcquire(&walsnd->mutex);
3428  walsnd->state = state;
3429  SpinLockRelease(&walsnd->mutex);
3430 }

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3277 of file walsender.c.

3278 {
3279  bool found;
3280  int i;
3281 
3282  WalSndCtl = (WalSndCtlData *)
3283  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3284 
3285  if (!found)
3286  {
3287  /* First time through, so initialize */
3289 
3290  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3292 
3293  for (i = 0; i < max_wal_senders; i++)
3294  {
3295  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3296 
3297  SpinLockInit(&walsnd->mutex);
3298  }
3299  }
3300 }
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
#define SpinLockInit(lock)
Definition: spin.h:60
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
Size WalSndShmemSize(void)
Definition: walsender.c:3265

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3265 of file walsender.c.

3266 {
3267  Size size = 0;
3268 
3269  size = offsetof(WalSndCtlData, walsnds);
3270  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3271 
3272  return size;
3273 }
#define offsetof(type, field)
Definition: c.h:727
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 230 of file walsender.c.

267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
@ LW_EXCLUSIVE
Definition: lwlock.h:104
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:59
PROC_HDR * ProcGlobal
Definition: proc.c:80
uint8 statusFlags
Definition: proc.h:227
int pgxactoff
Definition: proc.h:186
uint8 * statusFlags
Definition: proc.h:371
static void InitWalSenderSlot(void)
Definition: walsender.c:2582

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3246 of file walsender.c.

3247 {
3248  /* Set up signal handlers */
3250  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3251  pqsignal(SIGTERM, die); /* request shutdown */
3252  /* SIGQUIT handler was already set up by InitPostmasterChild */
3253  InitializeTimeouts(); /* establishes SIGALRM handler */
3256  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3257  * shutdown */
3258 
3259  /* Reset some signals that are accepted by postmaster but not here */
3261 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:95
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2962
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
void InitializeTimeouts(void)
Definition: timeout.c:474
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3234
#define SIGCHLD
Definition: win32_port.h:177
#define SIGHUP
Definition: win32_port.h:167
#define SIG_DFL
Definition: win32_port.h:162
#define SIGPIPE
Definition: win32_port.h:172
#define SIGUSR1
Definition: win32_port.h:179
#define SIGUSR2
Definition: win32_port.h:180
#define SIG_IGN
Definition: win32_port.h:164

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1480 of file walsender.c.

1482 {
1483  static TimestampTz sendTime = 0;
1485  bool pending_writes = false;
1486  bool end_xact = ctx->end_xact;
1487 
1488  /*
1489  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1490  * avoid flooding the lag tracker when we commit frequently.
1491  *
1492  * We don't have a mechanism to get the ack for any LSN other than end
1493  * xact LSN from the downstream. So, we track lag only for end of
1494  * transaction LSN.
1495  */
1496 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1497  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1499  {
1500  LagTrackerWrite(lsn, now);
1501  sendTime = now;
1502  }
1503 
1504  /*
1505  * When skipping empty transactions in synchronous replication, we send a
1506  * keepalive message to avoid delaying such transactions.
1507  *
1508  * It is okay to check sync_standbys_defined flag without lock here as in
1509  * the worst case we will just send an extra keepalive message when it is
1510  * really not required.
1511  */
1512  if (skipped_xact &&
1513  SyncRepRequested() &&
1514  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1515  {
1516  WalSndKeepalive(false, lsn);
1517 
1518  /* Try to flush pending output to the client */
1519  if (pq_flush_if_writable() != 0)
1520  WalSndShutdown();
1521 
1522  /* If we have pending write here, make sure it's actually flushed */
1523  if (pq_is_send_pending())
1524  pending_writes = true;
1525  }
1526 
1527  /*
1528  * Process pending writes if any or try to send a keepalive if required.
1529  * We don't need to try sending keep alive messages at the transaction end
1530  * as that will be done at a later point in time. This is required only
1531  * for large transactions where we don't send any changes to the
1532  * downstream and the receiver can timeout due to that.
1533  */
1534  if (pending_writes || (!end_xact &&
1536  wal_sender_timeout / 2)))
1538 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
#define SyncRepRequested()
Definition: syncrep.h:19
static void ProcessPendingWrites(void)
Definition: walsender.c:1426
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3705

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3337 of file walsender.c.

3338 {
3339  WaitEvent event;
3340 
3341  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3342  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3343  (event.events & WL_POSTMASTER_DEATH))
3344  proc_exit(1);
3345 }
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:947
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1320
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
uint32 events
Definition: latch.h:146

References WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, ModifyWaitEvent(), proc_exit(), WaitEventSetWait(), and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1548 of file walsender.c.

1549 {
1550  int wakeEvents;
1551  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1552 
1553  /*
1554  * Fast path to avoid acquiring the spinlock in case we already know we
1555  * have enough WAL available. This is particularly interesting if we're
1556  * far behind.
1557  */
1558  if (RecentFlushPtr != InvalidXLogRecPtr &&
1559  loc <= RecentFlushPtr)
1560  return RecentFlushPtr;
1561 
1562  /* Get a more recent flush pointer. */
1563  if (!RecoveryInProgress())
1564  RecentFlushPtr = GetFlushRecPtr(NULL);
1565  else
1566  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1567 
1568  for (;;)
1569  {
1570  long sleeptime;
1571 
1572  /* Clear any already-pending wakeups */
1574 
1576 
1577  /* Process any requests or signals received recently */
1578  if (ConfigReloadPending)
1579  {
1580  ConfigReloadPending = false;
1583  }
1584 
1585  /* Check for input from the client */
1587 
1588  /*
1589  * If we're shutting down, trigger pending WAL to be written out,
1590  * otherwise we'd possibly end up waiting for WAL that never gets
1591  * written, because walwriter has shut down already.
1592  */
1593  if (got_STOPPING)
1595 
1596  /* Update our idea of the currently flushed position. */
1597  if (!RecoveryInProgress())
1598  RecentFlushPtr = GetFlushRecPtr(NULL);
1599  else
1600  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1601 
1602  /*
1603  * If postmaster asked us to stop, don't wait anymore.
1604  *
1605  * It's important to do this check after the recomputation of
1606  * RecentFlushPtr, so we can send all remaining data before shutting
1607  * down.
1608  */
1609  if (got_STOPPING)
1610  break;
1611 
1612  /*
1613  * We only send regular messages to the client for full decoded
1614  * transactions, but a synchronous replication and walsender shutdown
1615  * possibly are waiting for a later location. So, before sleeping, we
1616  * send a ping containing the flush location. If the receiver is
1617  * otherwise idle, this keepalive will trigger a reply. Processing the
1618  * reply will update these MyWalSnd locations.
1619  */
1620  if (MyWalSnd->flush < sentPtr &&
1621  MyWalSnd->write < sentPtr &&
1624 
1625  /* check whether we're done */
1626  if (loc <= RecentFlushPtr)
1627  break;
1628 
1629  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1630  WalSndCaughtUp = true;
1631 
1632  /*
1633  * Try to flush any pending output to the client.
1634  */
1635  if (pq_flush_if_writable() != 0)
1636  WalSndShutdown();
1637 
1638  /*
1639  * If we have received CopyDone from the client, sent CopyDone
1640  * ourselves, and the output buffer is empty, it's time to exit
1641  * streaming, so fail the current WAL fetch request.
1642  */
1644  !pq_is_send_pending())
1645  break;
1646 
1647  /* die if timeout was reached */
1649 
1650  /* Send keepalive if the time has come */
1652 
1653  /*
1654  * Sleep until something happens or we time out. Also wait for the
1655  * socket becoming writable, if there's still pending output.
1656  * Otherwise we might sit on sendable output data while waiting for
1657  * new WAL to be generated. (But if we have nothing to send, we don't
1658  * want to wake on socket-writable.)
1659  */
1661 
1662  wakeEvents = WL_SOCKET_READABLE;
1663 
1664  if (pq_is_send_pending())
1665  wakeEvents |= WL_SOCKET_WRITEABLE;
1666 
1667  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1668  }
1669 
1670  /* reactivate latch so WalSndLoop knows to continue */
1671  SetLatch(MyLatch);
1672  return RecentFlushPtr;
1673 }
@ WAIT_EVENT_WAL_SENDER_WAIT_WAL
Definition: wait_event.h:68
bool XLogBackgroundFlush(void)
Definition: xlog.c:2699

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3380 of file walsender.c.

3381 {
3382  for (;;)
3383  {
3384  int i;
3385  bool all_stopped = true;
3386 
3387  for (i = 0; i < max_wal_senders; i++)
3388  {
3389  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3390 
3391  SpinLockAcquire(&walsnd->mutex);
3392 
3393  if (walsnd->pid == 0)
3394  {
3395  SpinLockRelease(&walsnd->mutex);
3396  continue;
3397  }
3398 
3399  if (walsnd->state != WALSNDSTATE_STOPPING)
3400  {
3401  all_stopped = false;
3402  SpinLockRelease(&walsnd->mutex);
3403  break;
3404  }
3405  SpinLockRelease(&walsnd->mutex);
3406  }
3407 
3408  /* safe to leave if confirmation is done for all WAL senders */
3409  if (all_stopped)
3410  return;
3411 
3412  pg_usleep(10000L); /* wait for 10 msec */
3413  }
3414 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3309 of file walsender.c.

3310 {
3311  int i;
3312 
3313  for (i = 0; i < max_wal_senders; i++)
3314  {
3315  Latch *latch;
3316  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3317 
3318  /*
3319  * Get latch pointer with spinlock held, for the unlikely case that
3320  * pointer reads aren't atomic (as they're 8 bytes).
3321  */
3322  SpinLockAcquire(&walsnd->mutex);
3323  latch = walsnd->latch;
3324  SpinLockRelease(&walsnd->mutex);
3325 
3326  if (latch != NULL)
3327  SetLatch(latch);
3328  }
3329 }
Definition: latch.h:111

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1384 of file walsender.c.

1386 {
1387  TimestampTz now;
1388 
1389  /*
1390  * Fill the send timestamp last, so that it is taken as late as possible.
1391  * This is somewhat ugly, but the protocol is set as it's already used for
1392  * several releases by streaming physical replication.
1393  */
1396  pq_sendint64(&tmpbuf, now);
1397  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1398  tmpbuf.data, sizeof(int64));
1399 
1400  /* output previously gathered data in a CopyData packet */
1401  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1402 
1404 
1405  /* Try to flush pending output to the client */
1406  if (pq_flush_if_writable() != 0)
1407  WalSndShutdown();
1408 
1409  /* Try taking fast path unless we get too close to walsender timeout. */
1411  wal_sender_timeout / 2) &&
1412  !pq_is_send_pending())
1413  {
1414  return;
1415  }
1416 
1417  /* If we have pending write here, go to slow path */
1419 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3038 of file walsender.c.

3039 {
3040  XLogRecord *record;
3041  char *errm;
3042 
3043  /*
3044  * We'll use the current flush point to determine whether we've caught up.
3045  * This variable is static in order to cache it across calls. Caching is
3046  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3047  * spinlock.
3048  */
3049  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3050 
3051  /*
3052  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3053  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3054  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3055  * didn't wait - i.e. when we're shutting down.
3056  */
3057  WalSndCaughtUp = false;
3058 
3059  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3060 
3061  /* xlog record was invalid */
3062  if (errm != NULL)
3063  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3064  errm);
3065 
3066  if (record != NULL)
3067  {
3068  /*
3069  * Note the lack of any call to LagTrackerWrite() which is handled by
3070  * WalSndUpdateProgress which is called by output plugin through
3071  * logical decoding write api.
3072  */
3074 
3076  }
3077 
3078  /*
3079  * If first time through in this session, initialize flushPtr. Otherwise,
3080  * we only need to update flushPtr if EndRecPtr is past it.
3081  */
3082  if (flushPtr == InvalidXLogRecPtr)
3083  flushPtr = GetFlushRecPtr(NULL);
3084  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3085  flushPtr = GetFlushRecPtr(NULL);
3086 
3087  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3088  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3089  WalSndCaughtUp = true;
3090 
3091  /*
3092  * If we're caught up and have been requested to stop, have WalSndLoop()
3093  * terminate the connection in an orderly manner, after writing out all
3094  * the pending data.
3095  */
3097  got_SIGUSR2 = true;
3098 
3099  /* Update shared memory status */
3100  {
3101  WalSnd *walsnd = MyWalSnd;
3102 
3103  SpinLockAcquire(&walsnd->mutex);
3104  walsnd->sentPtr = sentPtr;
3105  SpinLockRelease(&walsnd->mutex);
3106  }
3107 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:418

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2738 of file walsender.c.

2739 {
2740  XLogRecPtr SendRqstPtr;
2741  XLogRecPtr startptr;
2742  XLogRecPtr endptr;
2743  Size nbytes;
2744  XLogSegNo segno;
2745  WALReadError errinfo;
2746 
2747  /* If requested switch the WAL sender to the stopping state. */
2748  if (got_STOPPING)
2750 
2752  {
2753  WalSndCaughtUp = true;
2754  return;
2755  }
2756 
2757  /* Figure out how far we can safely send the WAL. */
2759  {
2760  /*
2761  * Streaming an old timeline that's in this server's history, but is
2762  * not the one we're currently inserting or replaying. It can be
2763  * streamed up to the point where we switched off that timeline.
2764  */
2765  SendRqstPtr = sendTimeLineValidUpto;
2766  }
2767  else if (am_cascading_walsender)
2768  {
2769  TimeLineID SendRqstTLI;
2770 
2771  /*
2772  * Streaming the latest timeline on a standby.
2773  *
2774  * Attempt to send all WAL that has already been replayed, so that we
2775  * know it's valid. If we're receiving WAL through streaming
2776  * replication, it's also OK to send any WAL that has been received
2777  * but not replayed.
2778  *
2779  * The timeline we're recovering from can change, or we can be
2780  * promoted. In either case, the current timeline becomes historic. We
2781  * need to detect that so that we don't try to stream past the point
2782  * where we switched to another timeline. We check for promotion or
2783  * timeline switch after calculating FlushPtr, to avoid a race
2784  * condition: if the timeline becomes historic just after we checked
2785  * that it was still current, it's still be OK to stream it up to the
2786  * FlushPtr that was calculated before it became historic.
2787  */
2788  bool becameHistoric = false;
2789 
2790  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2791 
2792  if (!RecoveryInProgress())
2793  {
2794  /* We have been promoted. */
2795  SendRqstTLI = GetWALInsertionTimeLine();
2796  am_cascading_walsender = false;
2797  becameHistoric = true;
2798  }
2799  else
2800  {
2801  /*
2802  * Still a cascading standby. But is the timeline we're sending
2803  * still the one recovery is recovering from?
2804  */
2805  if (sendTimeLine != SendRqstTLI)
2806  becameHistoric = true;
2807  }
2808 
2809  if (becameHistoric)
2810  {
2811  /*
2812  * The timeline we were sending has become historic. Read the
2813  * timeline history file of the new timeline to see where exactly
2814  * we forked off from the timeline we were sending.
2815  */
2816  List *history;
2817 
2818  history = readTimeLineHistory(SendRqstTLI);
2820 
2822  list_free_deep(history);
2823 
2824  sendTimeLineIsHistoric = true;
2825 
2826  SendRqstPtr = sendTimeLineValidUpto;
2827  }
2828  }
2829  else
2830  {
2831  /*
2832  * Streaming the current timeline on a primary.
2833  *
2834  * Attempt to send all data that's already been written out and
2835  * fsync'd to disk. We cannot go further than what's been written out
2836  * given the current implementation of WALRead(). And in any case
2837  * it's unsafe to send WAL that is not securely down to disk on the
2838  * primary: if the primary subsequently crashes and restarts, standbys
2839  * must not have applied any WAL that got lost on the primary.
2840  */
2841  SendRqstPtr = GetFlushRecPtr(NULL);
2842  }
2843 
2844  /*
2845  * Record the current system time as an approximation of the time at which
2846  * this WAL location was written for the purposes of lag tracking.
2847  *
2848  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2849  * is flushed and we could get that time as well as the LSN when we call
2850  * GetFlushRecPtr() above (and likewise for the cascading standby
2851  * equivalent), but rather than putting any new code into the hot WAL path
2852  * it seems good enough to capture the time here. We should reach this
2853  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2854  * may take some time, we read the WAL flush pointer and take the time
2855  * very close to together here so that we'll get a later position if it is
2856  * still moving.
2857  *
2858  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2859  * this gives us a cheap approximation for the WAL flush time for this
2860  * LSN.
2861  *
2862  * Note that the LSN is not necessarily the LSN for the data contained in
2863  * the present message; it's the end of the WAL, which might be further
2864  * ahead. All the lag tracking machinery cares about is finding out when
2865  * that arbitrary LSN is eventually reported as written, flushed and
2866  * applied, so that it can measure the elapsed time.
2867  */
2868  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2869 
2870  /*
2871  * If this is a historic timeline and we've reached the point where we
2872  * forked to the next timeline, stop streaming.
2873  *
2874  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2875  * startup process will normally replay all WAL that has been received
2876  * from the primary, before promoting, but if the WAL streaming is
2877  * terminated at a WAL page boundary, the valid portion of the timeline
2878  * might end in the middle of a WAL record. We might've already sent the
2879  * first half of that partial WAL record to the cascading standby, so that
2880  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2881  * replay the partial WAL record either, so it can still follow our
2882  * timeline switch.
2883  */
2885  {
2886  /* close the current file. */
2887  if (xlogreader->seg.ws_file >= 0)
2889 
2890  /* Send CopyDone */
2891  pq_putmessage_noblock('c', NULL, 0);
2892  streamingDoneSending = true;
2893 
2894  WalSndCaughtUp = true;
2895 
2896  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2899  return;
2900  }
2901 
2902  /* Do we have any work to do? */
2903  Assert(sentPtr <= SendRqstPtr);
2904  if (SendRqstPtr <= sentPtr)
2905  {
2906  WalSndCaughtUp = true;
2907  return;
2908  }
2909 
2910  /*
2911  * Figure out how much to send in one message. If there's no more than
2912  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2913  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2914  *
2915  * The rounding is not only for performance reasons. Walreceiver relies on
2916  * the fact that we never split a WAL record across two messages. Since a
2917  * long WAL record is split at page boundary into continuation records,
2918  * page boundary is always a safe cut-off point. We also assume that
2919  * SendRqstPtr never points to the middle of a WAL record.
2920  */
2921  startptr = sentPtr;
2922  endptr = startptr;
2923  endptr += MAX_SEND_SIZE;
2924 
2925  /* if we went beyond SendRqstPtr, back off */
2926  if (SendRqstPtr <= endptr)
2927  {
2928  endptr = SendRqstPtr;
2930  WalSndCaughtUp = false;
2931  else
2932  WalSndCaughtUp = true;
2933  }
2934  else
2935  {
2936  /* round down to page boundary. */
2937  endptr -= (endptr % XLOG_BLCKSZ);
2938  WalSndCaughtUp = false;
2939  }
2940 
2941  nbytes = endptr - startptr;
2942  Assert(nbytes <= MAX_SEND_SIZE);
2943 
2944  /*
2945  * OK to read and send the slice.
2946  */
2948  pq_sendbyte(&output_message, 'w');
2949 
2950  pq_sendint64(&output_message, startptr); /* dataStart */
2951  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2952  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2953 
2954  /*
2955  * Read the log directly into the output buffer to avoid extra memcpy
2956  * calls.
2957  */
2959 
2960 retry:
2961  if (!WALRead(xlogreader,
2963  startptr,
2964  nbytes,
2965  xlogreader->seg.ws_tli, /* Pass the current TLI because
2966  * only WalSndSegmentOpen controls
2967  * whether new TLI is needed. */
2968  &errinfo))
2969  WALReadRaiseError(&errinfo);
2970 
2971  /* See logical_read_xlog_page(). */
2972  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2974 
2975  /*
2976  * During recovery, the currently-open WAL file might be replaced with the
2977  * file of the same name retrieved from archive. So we always need to
2978  * check what we read was valid after reading into the buffer. If it's
2979  * invalid, we try to open and read the file again.
2980  */
2982  {
2983  WalSnd *walsnd = MyWalSnd;
2984  bool reload;
2985 
2986  SpinLockAcquire(&walsnd->mutex);
2987  reload = walsnd->needreload;
2988  walsnd->needreload = false;
2989  SpinLockRelease(&walsnd->mutex);
2990 
2991  if (reload && xlogreader->seg.ws_file >= 0)
2992  {
2994 
2995  goto retry;
2996  }
2997  }
2998 
2999  output_message.len += nbytes;
3001 
3002  /*
3003  * Fill the send timestamp last, so that it is taken as late as possible.
3004  */
3007  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3008  tmpbuf.data, sizeof(int64));
3009 
3011 
3012  sentPtr = endptr;
3013 
3014  /* Update shared memory status */
3015  {
3016  WalSnd *walsnd = MyWalSnd;
3017 
3018  SpinLockAcquire(&walsnd->mutex);
3019  walsnd->sentPtr = sentPtr;
3020  SpinLockRelease(&walsnd->mutex);
3021  }
3022 
3023  /* Report progress of XLOG streaming in PS display */
3025  {
3026  char activitymsg[50];
3027 
3028  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3030  set_ps_display(activitymsg);
3031  }
3032 }
bool update_process_title
Definition: ps_status.c:36
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:107

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 119 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 220 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

Definition at line 126 of file walsender.c.

Referenced by exec_replication_command().

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 198 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 158 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 181 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 131 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader