PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 208 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 107 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 226 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1030 of file walsender.c.

1031 {
1032  const char *snapshot_name = NULL;
1033  char xloc[MAXFNAMELEN];
1034  char *slot_name;
1035  bool reserve_wal = false;
1036  bool two_phase = false;
1037  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1038  DestReceiver *dest;
1039  TupOutputState *tstate;
1040  TupleDesc tupdesc;
1041  Datum values[4];
1042  bool nulls[4] = {0};
1043 
1045 
1046  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1047 
1048  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1049  {
1050  ReplicationSlotCreate(cmd->slotname, false,
1052  false);
1053  }
1054  else
1055  {
1057 
1058  /*
1059  * Initially create persistent slot as ephemeral - that allows us to
1060  * nicely handle errors during initialization because it'll get
1061  * dropped if this transaction fails. We'll make it persistent at the
1062  * end. Temporary slots can be created as temporary from beginning as
1063  * they get dropped on error as well.
1064  */
1065  ReplicationSlotCreate(cmd->slotname, true,
1067  two_phase);
1068  }
1069 
1070  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1071  {
1073  bool need_full_snapshot = false;
1074 
1075  /*
1076  * Do options check early so that we can bail before calling the
1077  * DecodingContextFindStartpoint which can take long time.
1078  */
1079  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1080  {
1081  if (IsTransactionBlock())
1082  ereport(ERROR,
1083  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1084  (errmsg("%s must not be called inside a transaction",
1085  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1086 
1087  need_full_snapshot = true;
1088  }
1089  else if (snapshot_action == CRS_USE_SNAPSHOT)
1090  {
1091  if (!IsTransactionBlock())
1092  ereport(ERROR,
1093  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1094  (errmsg("%s must be called inside a transaction",
1095  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1096 
1098  ereport(ERROR,
1099  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1100  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1101  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1102  if (!XactReadOnly)
1103  ereport(ERROR,
1104  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105  (errmsg("%s must be called in a read only transaction",
1106  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 
1108  if (FirstSnapshotSet)
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called before any query",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113 
1114  if (IsSubTransaction())
1115  ereport(ERROR,
1116  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1117  (errmsg("%s must not be called in a subtransaction",
1118  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1119 
1120  need_full_snapshot = true;
1121  }
1122 
1123  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1125  XL_ROUTINE(.page_read = logical_read_xlog_page,
1126  .segment_open = WalSndSegmentOpen,
1127  .segment_close = wal_segment_close),
1130 
1131  /*
1132  * Signal that we don't need the timeout mechanism. We're just
1133  * creating the replication slot and don't yet accept feedback
1134  * messages or send keepalives. As we possibly need to wait for
1135  * further WAL the walsender would otherwise possibly be killed too
1136  * soon.
1137  */
1139 
1140  /* build initial snapshot, might take a while */
1142 
1143  /*
1144  * Export or use the snapshot if we've been asked to do so.
1145  *
1146  * NB. We will convert the snapbuild.c kind of snapshot to normal
1147  * snapshot when doing this.
1148  */
1149  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1150  {
1151  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1152  }
1153  else if (snapshot_action == CRS_USE_SNAPSHOT)
1154  {
1155  Snapshot snap;
1156 
1159  }
1160 
1161  /* don't need the decoding context anymore */
1162  FreeDecodingContext(ctx);
1163 
1164  if (!cmd->temporary)
1166  }
1167  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1168  {
1170 
1172 
1173  /* Write this slot to disk if it's a permanent one. */
1174  if (!cmd->temporary)
1176  }
1177 
1178  snprintf(xloc, sizeof(xloc), "%X/%X",
1180 
1182 
1183  /*----------
1184  * Need a tuple descriptor representing four columns:
1185  * - first field: the slot name
1186  * - second field: LSN at which we became consistent
1187  * - third field: exported snapshot's name
1188  * - fourth field: output plugin
1189  *----------
1190  */
1191  tupdesc = CreateTemplateTupleDesc(4);
1192  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1193  TEXTOID, -1, 0);
1194  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1195  TEXTOID, -1, 0);
1196  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1197  TEXTOID, -1, 0);
1198  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1199  TEXTOID, -1, 0);
1200 
1201  /* prepare for projection of tuples */
1202  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1203 
1204  /* slot_name */
1205  slot_name = NameStr(MyReplicationSlot->data.name);
1206  values[0] = CStringGetTextDatum(slot_name);
1207 
1208  /* consistent wal location */
1209  values[1] = CStringGetTextDatum(xloc);
1210 
1211  /* snapshot name, or NULL if none */
1212  if (snapshot_name != NULL)
1213  values[2] = CStringGetTextDatum(snapshot_name);
1214  else
1215  nulls[2] = true;
1216 
1217  /* plugin, or NULL if none */
1218  if (cmd->plugin != NULL)
1219  values[3] = CStringGetTextDatum(cmd->plugin);
1220  else
1221  nulls[3] = true;
1222 
1223  /* send it to dest */
1224  do_tup_output(tstate, values, nulls);
1225  end_tup_output(tstate);
1226 
1228 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
#define NameStr(name)
Definition: c.h:682
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:93
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
Assert(fmt[strlen(fmt) - 1] !='\n')
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:634
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:590
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:319
#define NIL
Definition: pg_list.h:66
static bool two_phase
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:412
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition: slot.c:796
void ReplicationSlotReserveWal(void)
Definition: slot.c:1156
void ReplicationSlotPersist(void)
Definition: slot.c:813
ReplicationSlot * MyReplicationSlot
Definition: slot.c:98
void ReplicationSlotSave(void)
Definition: slot.c:778
void ReplicationSlotRelease(void)
Definition: slot.c:547
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:252
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:664
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:565
bool FirstSnapshotSet
Definition: snapmgr.c:150
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2271
PGPROC * MyProc
Definition: proc.c:68
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:84
ReplicationSlotPersistentData data
Definition: slot.h:147
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2647
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:963
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1371
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1467
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:902
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1344
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:81
int XactIsoLevel
Definition: xact.c:78
bool IsSubTransaction(void)
Definition: xact.c:4869
bool IsTransactionBlock(void)
Definition: xact.c:4796
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:859

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1234 of file walsender.c.

1235 {
1236  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1237 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:641

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1669 of file walsender.c.

1670 {
1671  int parse_rc;
1672  Node *cmd_node;
1673  const char *cmdtag;
1674  MemoryContext cmd_context;
1675  MemoryContext old_context;
1676 
1677  /*
1678  * If WAL sender has been told that shutdown is getting close, switch its
1679  * status accordingly to handle the next replication commands correctly.
1680  */
1681  if (got_STOPPING)
1683 
1684  /*
1685  * Throw error if in stopping mode. We need prevent commands that could
1686  * generate WAL while the shutdown checkpoint is being written. To be
1687  * safe, we just prohibit all new commands.
1688  */
1690  ereport(ERROR,
1691  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1692  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1693 
1694  /*
1695  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1696  * command arrives. Clean up the old stuff if there's anything.
1697  */
1699 
1701 
1702  /*
1703  * Prepare to parse and execute the command.
1704  */
1706  "Replication command context",
1708  old_context = MemoryContextSwitchTo(cmd_context);
1709 
1710  replication_scanner_init(cmd_string);
1711 
1712  /*
1713  * Is it a WalSender command?
1714  */
1716  {
1717  /* Nope; clean up and get out. */
1719 
1720  MemoryContextSwitchTo(old_context);
1721  MemoryContextDelete(cmd_context);
1722 
1723  /* XXX this is a pretty random place to make this check */
1724  if (MyDatabaseId == InvalidOid)
1725  ereport(ERROR,
1726  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1727  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1728 
1729  /* Tell the caller that this wasn't a WalSender command. */
1730  return false;
1731  }
1732 
1733  /*
1734  * Looks like a WalSender command, so parse it.
1735  */
1736  parse_rc = replication_yyparse();
1737  if (parse_rc != 0)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_SYNTAX_ERROR),
1740  errmsg_internal("replication command parser returned %d",
1741  parse_rc)));
1743 
1744  cmd_node = replication_parse_result;
1745 
1746  /*
1747  * Report query to various monitoring facilities. For this purpose, we
1748  * report replication commands just like SQL commands.
1749  */
1750  debug_query_string = cmd_string;
1751 
1753 
1754  /*
1755  * Log replication command if log_replication_commands is enabled. Even
1756  * when it's disabled, log the command with DEBUG1 level for backward
1757  * compatibility.
1758  */
1760  (errmsg("received replication command: %s", cmd_string)));
1761 
1762  /*
1763  * Disallow replication commands in aborted transaction blocks.
1764  */
1766  ereport(ERROR,
1767  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1768  errmsg("current transaction is aborted, "
1769  "commands ignored until end of transaction block")));
1770 
1772 
1773  /*
1774  * Allocate buffers that will be used for each outgoing and incoming
1775  * message. We do this just once per command to reduce palloc overhead.
1776  */
1780 
1781  switch (cmd_node->type)
1782  {
1783  case T_IdentifySystemCmd:
1784  cmdtag = "IDENTIFY_SYSTEM";
1785  set_ps_display(cmdtag);
1786  IdentifySystem();
1787  EndReplicationCommand(cmdtag);
1788  break;
1789 
1790  case T_ReadReplicationSlotCmd:
1791  cmdtag = "READ_REPLICATION_SLOT";
1792  set_ps_display(cmdtag);
1794  EndReplicationCommand(cmdtag);
1795  break;
1796 
1797  case T_BaseBackupCmd:
1798  cmdtag = "BASE_BACKUP";
1799  set_ps_display(cmdtag);
1800  PreventInTransactionBlock(true, cmdtag);
1801  SendBaseBackup((BaseBackupCmd *) cmd_node);
1802  EndReplicationCommand(cmdtag);
1803  break;
1804 
1805  case T_CreateReplicationSlotCmd:
1806  cmdtag = "CREATE_REPLICATION_SLOT";
1807  set_ps_display(cmdtag);
1809  EndReplicationCommand(cmdtag);
1810  break;
1811 
1812  case T_DropReplicationSlotCmd:
1813  cmdtag = "DROP_REPLICATION_SLOT";
1814  set_ps_display(cmdtag);
1816  EndReplicationCommand(cmdtag);
1817  break;
1818 
1819  case T_StartReplicationCmd:
1820  {
1821  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1822 
1823  cmdtag = "START_REPLICATION";
1824  set_ps_display(cmdtag);
1825  PreventInTransactionBlock(true, cmdtag);
1826 
1827  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1828  StartReplication(cmd);
1829  else
1831 
1832  /* dupe, but necessary per libpqrcv_endstreaming */
1833  EndReplicationCommand(cmdtag);
1834 
1835  Assert(xlogreader != NULL);
1836  break;
1837  }
1838 
1839  case T_TimeLineHistoryCmd:
1840  cmdtag = "TIMELINE_HISTORY";
1841  set_ps_display(cmdtag);
1842  PreventInTransactionBlock(true, cmdtag);
1844  EndReplicationCommand(cmdtag);
1845  break;
1846 
1847  case T_VariableShowStmt:
1848  {
1850  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1851 
1852  cmdtag = "SHOW";
1853  set_ps_display(cmdtag);
1854 
1855  /* syscache access needs a transaction environment */
1857  GetPGVariable(n->name, dest);
1859  EndReplicationCommand(cmdtag);
1860  }
1861  break;
1862 
1863  default:
1864  elog(ERROR, "unrecognized replication command node tag: %u",
1865  cmd_node->type);
1866  }
1867 
1868  /* done */
1869  MemoryContextSwitchTo(old_context);
1870  MemoryContextDelete(cmd_context);
1871 
1872  /*
1873  * We need not update ps display or pg_stat_activity, because PostgresMain
1874  * will reset those to "idle". But we must reset debug_query_string to
1875  * ensure it doesn't become a dangling pointer.
1876  */
1877  debug_query_string = NULL;
1878 
1879  return true;
1880 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:964
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:222
int errmsg_internal(const char *fmt,...)
Definition: elog.c:993
int errcode(int sqlerrcode)
Definition: elog.c:695
#define LOG
Definition: elog.h:27
#define DEBUG1
Definition: elog.h:26
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
const char * debug_query_string
Definition: postgres.c:82
#define InvalidOid
Definition: postgres_ext.h:36
void set_ps_display(const char *activity)
Definition: ps_status.c:342
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:725
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:118
NodeTag type
Definition: nodes.h:119
ReplicationKind kind
Definition: replnodes.h:82
WalSndState state
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:577
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:478
static StringInfoData tmpbuf
Definition: walsender.c:160
static void IdentifySystem(void)
Definition: walsender.c:395
static StringInfoData reply_message
Definition: walsender.c:159
void WalSndSetState(WalSndState state)
Definition: walsender.c:3405
static StringInfoData output_message
Definition: walsender.c:158
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
bool log_replication_commands
Definition: walsender.c:126
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1030
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1244
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1234
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
static XLogReaderState * xlogreader
Definition: walsender.c:138
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3469
void StartTransactionCommand(void)
Definition: xact.c:2925
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:397
void CommitTransactionCommand(void)
Definition: xact.c:3022

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog(), EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)
static

Definition at line 3146 of file walsender.c.

3147 {
3148  XLogRecPtr replayPtr;
3149  TimeLineID replayTLI;
3150  XLogRecPtr receivePtr;
3152  XLogRecPtr result;
3153 
3154  /*
3155  * We can safely send what's already been replayed. Also, if walreceiver
3156  * is streaming WAL from the same timeline, we can send anything that it
3157  * has streamed, but hasn't been replayed yet.
3158  */
3159 
3160  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3161  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3162 
3163  *tli = replayTLI;
3164 
3165  result = replayPtr;
3166  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3167  result = receivePtr;
3168 
3169  return result;
3170 }
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3199 of file walsender.c.

3200 {
3202 
3203  /*
3204  * If replication has not yet started, die like with SIGTERM. If
3205  * replication is active, only set a flag and wake up the main loop. It
3206  * will send any outstanding WAL, wait for it to be replicated to the
3207  * standby, and then exit gracefully.
3208  */
3209  if (!replication_active)
3210  kill(MyProcPid, SIGTERM);
3211  else
3212  got_STOPPING = true;
3213 }
int MyProcPid
Definition: globals.c:44
bool am_walsender
Definition: walsender.c:116
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define kill(pid, sig)
Definition: win32_port.h:482

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 395 of file walsender.c.

396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4] = {0};
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440 
441  /* need a tuple descriptor representing four columns */
442  tupdesc = CreateTemplateTupleDesc(4);
443  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
444  TEXTOID, -1, 0);
445  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446  INT8OID, -1, 0);
447  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
448  TEXTOID, -1, 0);
449  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450  TEXTOID, -1, 0);
451 
452  /* prepare for projection of tuples */
453  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 
455  /* column 1: system identifier */
456  values[0] = CStringGetTextDatum(sysid);
457 
458  /* column 2: timeline */
459  values[1] = Int64GetDatum(currTLI);
460 
461  /* column 3: wal location */
462  values[2] = CStringGetTextDatum(xloc);
463 
464  /* column 4: database name, or NULL if none */
465  if (dbname)
467  else
468  nulls[3] = true;
469 
470  /* send it to dest */
471  do_tup_output(tstate, values, nulls);
472 
473  end_tup_output(tstate);
474 }
#define UINT64_FORMAT
Definition: c.h:485
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2981
struct cursor * cur
Definition: ecpg.c:28
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
char * dbname
Definition: streamutil.c:51
bool am_cascading_walsender
Definition: walsender.c:117
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3146
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4181
bool RecoveryInProgress(void)
Definition: xlog.c:5912
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6077

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2569 of file walsender.c.

2570 {
2571  int i;
2572 
2573  /*
2574  * WalSndCtl should be set up already (we inherit this by fork() or
2575  * EXEC_BACKEND mechanism from the postmaster).
2576  */
2577  Assert(WalSndCtl != NULL);
2578  Assert(MyWalSnd == NULL);
2579 
2580  /*
2581  * Find a free walsender slot and reserve it. This must not fail due to
2582  * the prior check for free WAL senders in InitProcess().
2583  */
2584  for (i = 0; i < max_wal_senders; i++)
2585  {
2586  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2587 
2588  SpinLockAcquire(&walsnd->mutex);
2589 
2590  if (walsnd->pid != 0)
2591  {
2592  SpinLockRelease(&walsnd->mutex);
2593  continue;
2594  }
2595  else
2596  {
2597  /*
2598  * Found a free slot. Reserve it for us.
2599  */
2600  walsnd->pid = MyProcPid;
2601  walsnd->state = WALSNDSTATE_STARTUP;
2602  walsnd->sentPtr = InvalidXLogRecPtr;
2603  walsnd->needreload = false;
2604  walsnd->write = InvalidXLogRecPtr;
2605  walsnd->flush = InvalidXLogRecPtr;
2606  walsnd->apply = InvalidXLogRecPtr;
2607  walsnd->writeLag = -1;
2608  walsnd->flushLag = -1;
2609  walsnd->applyLag = -1;
2610  walsnd->sync_standby_priority = 0;
2611  walsnd->latch = &MyProc->procLatch;
2612  walsnd->replyTime = 0;
2613  SpinLockRelease(&walsnd->mutex);
2614  /* don't need the lock anymore */
2615  MyWalSnd = (WalSnd *) walsnd;
2616 
2617  break;
2618  }
2619  }
2620 
2621  Assert(MyWalSnd != NULL);
2622 
2623  /* Arrange to clean up at walsender exit */
2625 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch procLatch
Definition: proc.h:170
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:122
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2629
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3756 of file walsender.c.

3757 {
3758  TimestampTz time = 0;
3759 
3760  /* Read all unread samples up to this LSN or end of buffer. */
3761  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3762  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3763  {
3764  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3765  lag_tracker->last_read[head] =
3767  lag_tracker->read_heads[head] =
3769  }
3770 
3771  /*
3772  * If the lag tracker is empty, that means the standby has processed
3773  * everything we've ever sent so we should now clear 'last_read'. If we
3774  * didn't do that, we'd risk using a stale and irrelevant sample for
3775  * interpolation at the beginning of the next burst of WAL after a period
3776  * of idleness.
3777  */
3779  lag_tracker->last_read[head].time = 0;
3780 
3781  if (time > now)
3782  {
3783  /* If the clock somehow went backwards, treat as not found. */
3784  return -1;
3785  }
3786  else if (time == 0)
3787  {
3788  /*
3789  * We didn't cross a time. If there is a future sample that we
3790  * haven't reached yet, and we've already reached at least one sample,
3791  * let's interpolate the local flushed time. This is mainly useful
3792  * for reporting a completely stuck apply position as having
3793  * increasing lag, since otherwise we'd have to wait for it to
3794  * eventually start moving again and cross one of our samples before
3795  * we can show the lag increasing.
3796  */
3798  {
3799  /* There are no future samples, so we can't interpolate. */
3800  return -1;
3801  }
3802  else if (lag_tracker->last_read[head].time != 0)
3803  {
3804  /* We can interpolate between last_read and the next sample. */
3805  double fraction;
3806  WalTimeSample prev = lag_tracker->last_read[head];
3808 
3809  if (lsn < prev.lsn)
3810  {
3811  /*
3812  * Reported LSNs shouldn't normally go backwards, but it's
3813  * possible when there is a timeline change. Treat as not
3814  * found.
3815  */
3816  return -1;
3817  }
3818 
3819  Assert(prev.lsn < next.lsn);
3820 
3821  if (prev.time > next.time)
3822  {
3823  /* If the clock somehow went backwards, treat as not found. */
3824  return -1;
3825  }
3826 
3827  /* See how far we are between the previous and next samples. */
3828  fraction =
3829  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3830 
3831  /* Scale the local flush time proportionally. */
3832  time = (TimestampTz)
3833  ((double) prev.time + (next.time - prev.time) * fraction);
3834  }
3835  else
3836  {
3837  /*
3838  * We have only a future sample, implying that we were entirely
3839  * caught up but and now there is a new burst of WAL and the
3840  * standby hasn't processed the first sample yet. Until the
3841  * standby reaches the future sample the best we can do is report
3842  * the hypothetical lag if that sample were to be replayed now.
3843  */
3844  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3845  }
3846  }
3847 
3848  /* Return the elapsed time since local flush time in microseconds. */
3849  Assert(time != 0);
3850  return now - time;
3851 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1537
static int32 next
Definition: blutils.c:219
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:220
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3691 of file walsender.c.

3692 {
3693  bool buffer_full;
3694  int new_write_head;
3695  int i;
3696 
3697  if (!am_walsender)
3698  return;
3699 
3700  /*
3701  * If the lsn hasn't advanced since last time, then do nothing. This way
3702  * we only record a new sample when new WAL has been written.
3703  */
3704  if (lag_tracker->last_lsn == lsn)
3705  return;
3706  lag_tracker->last_lsn = lsn;
3707 
3708  /*
3709  * If advancing the write head of the circular buffer would crash into any
3710  * of the read heads, then the buffer is full. In other words, the
3711  * slowest reader (presumably apply) is the one that controls the release
3712  * of space.
3713  */
3714  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3715  buffer_full = false;
3716  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3717  {
3718  if (new_write_head == lag_tracker->read_heads[i])
3719  buffer_full = true;
3720  }
3721 
3722  /*
3723  * If the buffer is full, for now we just rewind by one slot and overwrite
3724  * the last sample, as a simple (if somewhat uneven) way to lower the
3725  * sampling rate. There may be better adaptive compaction algorithms.
3726  */
3727  if (buffer_full)
3728  {
3729  new_write_head = lag_tracker->write_head;
3730  if (lag_tracker->write_head > 0)
3732  else
3734  }
3735 
3736  /* Store a sample at the current write head position. */
3738  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3739  lag_tracker->write_head = new_write_head;
3740 }
XLogRecPtr last_lsn
Definition: walsender.c:213
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 902 of file walsender.c.

904 {
905  XLogRecPtr flushptr;
906  int count;
907  WALReadError errinfo;
908  XLogSegNo segno;
910 
911  /*
912  * Since logical decoding is only permitted on a primary server, we know
913  * that the current timeline ID can't be changing any more. If we did this
914  * on a standby, we'd have to worry about the values we compute here
915  * becoming invalid due to a promotion or timeline change.
916  */
917  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
918  sendTimeLineIsHistoric = (state->currTLI != currTLI);
919  sendTimeLine = state->currTLI;
920  sendTimeLineValidUpto = state->currTLIValidUntil;
921  sendTimeLineNextTLI = state->nextTLI;
922 
923  /* make sure we have enough WAL available */
924  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
925 
926  /* fail if not (implies we are going to shut down) */
927  if (flushptr < targetPagePtr + reqLen)
928  return -1;
929 
930  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
931  count = XLOG_BLCKSZ; /* more than one block available */
932  else
933  count = flushptr - targetPagePtr; /* part of the page available */
934 
935  /* now actually read the data, we know it's there */
936  if (!WALRead(state,
937  cur_page,
938  targetPagePtr,
939  XLOG_BLCKSZ,
940  state->seg.ws_tli, /* Pass the current TLI because only
941  * WalSndSegmentOpen controls whether new
942  * TLI is needed. */
943  &errinfo))
944  WALReadRaiseError(&errinfo);
945 
946  /*
947  * After reading into the buffer, check that what we read was valid. We do
948  * this after reading, because even though the segment was present when we
949  * opened it, it might get recycled or removed while we read it. The
950  * read() succeeds in that case, but the data we tried to read might
951  * already have been overwritten with new WAL records.
952  */
953  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
954  CheckXLogRemoved(segno, state->seg.ws_tli);
955 
956  return count;
957 }
Definition: regguts.h:318
static TimeLineID sendTimeLine
Definition: walsender.c:146
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1535
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6100
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3430
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1492
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:735
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1042

References CheckXLogRemoved(), GetWALInsertionTimeLine(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3443 of file walsender.c.

3444 {
3445  Interval *result = palloc(sizeof(Interval));
3446 
3447  result->month = 0;
3448  result->day = 0;
3449  result->time = offset;
3450 
3451  return result;
3452 }
void * palloc(Size size)
Definition: mcxt.c:1199
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase 
)
static

Definition at line 963 of file walsender.c.

967 {
968  ListCell *lc;
969  bool snapshot_action_given = false;
970  bool reserve_wal_given = false;
971  bool two_phase_given = false;
972 
973  /* Parse options */
974  foreach(lc, cmd->options)
975  {
976  DefElem *defel = (DefElem *) lfirst(lc);
977 
978  if (strcmp(defel->defname, "snapshot") == 0)
979  {
980  char *action;
981 
982  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
983  ereport(ERROR,
984  (errcode(ERRCODE_SYNTAX_ERROR),
985  errmsg("conflicting or redundant options")));
986 
987  action = defGetString(defel);
988  snapshot_action_given = true;
989 
990  if (strcmp(action, "export") == 0)
991  *snapshot_action = CRS_EXPORT_SNAPSHOT;
992  else if (strcmp(action, "nothing") == 0)
993  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
994  else if (strcmp(action, "use") == 0)
995  *snapshot_action = CRS_USE_SNAPSHOT;
996  else
997  ereport(ERROR,
998  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
999  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1000  defel->defname, action)));
1001  }
1002  else if (strcmp(defel->defname, "reserve_wal") == 0)
1003  {
1004  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1005  ereport(ERROR,
1006  (errcode(ERRCODE_SYNTAX_ERROR),
1007  errmsg("conflicting or redundant options")));
1008 
1009  reserve_wal_given = true;
1010  *reserve_wal = defGetBoolean(defel);
1011  }
1012  else if (strcmp(defel->defname, "two_phase") == 0)
1013  {
1014  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1015  ereport(ERROR,
1016  (errcode(ERRCODE_SYNTAX_ERROR),
1017  errmsg("conflicting or redundant options")));
1018  two_phase_given = true;
1019  *two_phase = defGetBoolean(defel);
1020  }
1021  else
1022  elog(ERROR, "unrecognized option: %s", defel->defname);
1023  }
1024 }
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
#define lfirst(lc)
Definition: pg_list.h:170
char * defname
Definition: parsenodes.h:777
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog(), ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3459 of file walsender.c.

3460 {
3461 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3462  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3463  SyncRepStandbyData *sync_standbys;
3464  int num_standbys;
3465  int i;
3466 
3467  InitMaterializedSRF(fcinfo, 0);
3468 
3469  /*
3470  * Get the currently active synchronous standbys. This could be out of
3471  * date before we're done, but we'll use the data anyway.
3472  */
3473  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3474 
3475  for (i = 0; i < max_wal_senders; i++)
3476  {
3477  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3479  XLogRecPtr write;
3480  XLogRecPtr flush;
3481  XLogRecPtr apply;
3482  TimeOffset writeLag;
3483  TimeOffset flushLag;
3484  TimeOffset applyLag;
3485  int priority;
3486  int pid;
3488  TimestampTz replyTime;
3489  bool is_sync_standby;
3491  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3492  int j;
3493 
3494  /* Collect data from shared memory */
3495  SpinLockAcquire(&walsnd->mutex);
3496  if (walsnd->pid == 0)
3497  {
3498  SpinLockRelease(&walsnd->mutex);
3499  continue;
3500  }
3501  pid = walsnd->pid;
3502  sentPtr = walsnd->sentPtr;
3503  state = walsnd->state;
3504  write = walsnd->write;
3505  flush = walsnd->flush;
3506  apply = walsnd->apply;
3507  writeLag = walsnd->writeLag;
3508  flushLag = walsnd->flushLag;
3509  applyLag = walsnd->applyLag;
3510  priority = walsnd->sync_standby_priority;
3511  replyTime = walsnd->replyTime;
3512  SpinLockRelease(&walsnd->mutex);
3513 
3514  /*
3515  * Detect whether walsender is/was considered synchronous. We can
3516  * provide some protection against stale data by checking the PID
3517  * along with walsnd_index.
3518  */
3519  is_sync_standby = false;
3520  for (j = 0; j < num_standbys; j++)
3521  {
3522  if (sync_standbys[j].walsnd_index == i &&
3523  sync_standbys[j].pid == pid)
3524  {
3525  is_sync_standby = true;
3526  break;
3527  }
3528  }
3529 
3530  values[0] = Int32GetDatum(pid);
3531 
3532  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3533  {
3534  /*
3535  * Only superusers and roles with privileges of pg_read_all_stats
3536  * can see details. Other users only get the pid value to know
3537  * it's a walsender, but no details.
3538  */
3539  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3540  }
3541  else
3542  {
3544 
3546  nulls[2] = true;
3547  values[2] = LSNGetDatum(sentPtr);
3548 
3550  nulls[3] = true;
3551  values[3] = LSNGetDatum(write);
3552 
3553  if (XLogRecPtrIsInvalid(flush))
3554  nulls[4] = true;
3555  values[4] = LSNGetDatum(flush);
3556 
3557  if (XLogRecPtrIsInvalid(apply))
3558  nulls[5] = true;
3559  values[5] = LSNGetDatum(apply);
3560 
3561  /*
3562  * Treat a standby such as a pg_basebackup background process
3563  * which always returns an invalid flush location, as an
3564  * asynchronous standby.
3565  */
3566  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3567 
3568  if (writeLag < 0)
3569  nulls[6] = true;
3570  else
3572 
3573  if (flushLag < 0)
3574  nulls[7] = true;
3575  else
3577 
3578  if (applyLag < 0)
3579  nulls[8] = true;
3580  else
3582 
3583  values[9] = Int32GetDatum(priority);
3584 
3585  /*
3586  * More easily understood version of standby state. This is purely
3587  * informational.
3588  *
3589  * In quorum-based sync replication, the role of each standby
3590  * listed in synchronous_standby_names can be changing very
3591  * frequently. Any standbys considered as "sync" at one moment can
3592  * be switched to "potential" ones at the next moment. So, it's
3593  * basically useless to report "sync" or "potential" as their sync
3594  * states. We report just "quorum" for them.
3595  */
3596  if (priority == 0)
3597  values[10] = CStringGetTextDatum("async");
3598  else if (is_sync_standby)
3600  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3601  else
3602  values[10] = CStringGetTextDatum("potential");
3603 
3604  if (replyTime == 0)
3605  nulls[11] = true;
3606  else
3607  values[11] = TimestampTzGetDatum(replyTime);
3608  }
3609 
3610  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3611  values, nulls);
3612  }
3613 
3614  return (Datum) 0;
3615 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4949
#define MemSet(start, val, len)
Definition: c.h:953
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:497
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:560
TupleDesc setDesc
Definition: execnodes.h:332
Tuplestorestate * setResult
Definition: execnodes.h:331
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:726
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
static XLogRecPtr sentPtr
Definition: walsender.c:155
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3443
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3424
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, sentPtr, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2031 of file walsender.c.

2032 {
2033  bool changed = false;
2035 
2036  Assert(lsn != InvalidXLogRecPtr);
2037  SpinLockAcquire(&slot->mutex);
2038  if (slot->data.restart_lsn != lsn)
2039  {
2040  changed = true;
2041  slot->data.restart_lsn = lsn;
2042  }
2043  SpinLockRelease(&slot->mutex);
2044 
2045  if (changed)
2046  {
2049  }
2050 
2051  /*
2052  * One could argue that the slot should be saved to disk now, but that'd
2053  * be energy wasted - the worst lost information can do here is give us
2054  * wrong information in a statistics view - we'll just potentially be more
2055  * conservative in removing files.
2056  */
2057 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:892
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:120

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2168 of file walsender.c.

2169 {
2170  bool changed = false;
2172 
2173  SpinLockAcquire(&slot->mutex);
2175 
2176  /*
2177  * For physical replication we don't need the interlock provided by xmin
2178  * and effective_xmin since the consequences of a missed increase are
2179  * limited to query cancellations, so set both at once.
2180  */
2181  if (!TransactionIdIsNormal(slot->data.xmin) ||
2182  !TransactionIdIsNormal(feedbackXmin) ||
2183  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2184  {
2185  changed = true;
2186  slot->data.xmin = feedbackXmin;
2187  slot->effective_xmin = feedbackXmin;
2188  }
2189  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2190  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2191  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2192  {
2193  changed = true;
2194  slot->data.catalog_xmin = feedbackCatalogXmin;
2195  slot->effective_catalog_xmin = feedbackCatalogXmin;
2196  }
2197  SpinLockRelease(&slot->mutex);
2198 
2199  if (changed)
2200  {
2203  }
2204 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:835
TransactionId xmin
Definition: proc.h:178
TransactionId xmin
Definition: slot.h:62
TransactionId catalog_xmin
Definition: slot.h:70
TransactionId effective_catalog_xmin
Definition: slot.h:144
TransactionId effective_xmin
Definition: slot.h:143
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:273
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1413 of file walsender.c.

1414 {
1415  for (;;)
1416  {
1417  long sleeptime;
1418 
1419  /* Check for input from the client */
1421 
1422  /* die if timeout was reached */
1424 
1425  /* Send keepalive if the time has come */
1427 
1428  if (!pq_is_send_pending())
1429  break;
1430 
1432 
1433  /* Sleep until something happens or we time out */
1436 
1437  /* Clear any already-pending wakeups */
1439 
1441 
1442  /* Process any requests or signals received recently */
1443  if (ConfigReloadPending)
1444  {
1445  ConfigReloadPending = false;
1448  }
1449 
1450  /* Try to flush pending output to the client */
1451  if (pq_flush_if_writable() != 0)
1452  WalSndShutdown();
1453  }
1454 
1455  /* reactivate latch so WalSndLoop knows to continue */
1456  SetLatch(MyLatch);
1457 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1573
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:591
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:412
@ WAIT_EVENT_WAL_SENDER_WRITE_DATA
Definition: wait_event.h:69
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3324
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2416
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1887
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3653
static void WalSndShutdown(void)
Definition: walsender.c:230
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2372

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WRITE_DATA, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1887 of file walsender.c.

1888 {
1889  unsigned char firstchar;
1890  int maxmsglen;
1891  int r;
1892  bool received = false;
1893 
1895 
1896  /*
1897  * If we already received a CopyDone from the frontend, any subsequent
1898  * message is the beginning of a new command, and should be processed in
1899  * the main processing loop.
1900  */
1901  while (!streamingDoneReceiving)
1902  {
1903  pq_startmsgread();
1904  r = pq_getbyte_if_available(&firstchar);
1905  if (r < 0)
1906  {
1907  /* unexpected error or EOF */
1909  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1910  errmsg("unexpected EOF on standby connection")));
1911  proc_exit(0);
1912  }
1913  if (r == 0)
1914  {
1915  /* no data available without blocking */
1916  pq_endmsgread();
1917  break;
1918  }
1919 
1920  /* Validate message type and set packet size limit */
1921  switch (firstchar)
1922  {
1923  case 'd':
1924  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1925  break;
1926  case 'c':
1927  case 'X':
1928  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1929  break;
1930  default:
1931  ereport(FATAL,
1932  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1933  errmsg("invalid standby message type \"%c\"",
1934  firstchar)));
1935  maxmsglen = 0; /* keep compiler quiet */
1936  break;
1937  }
1938 
1939  /* Read the message contents */
1941  if (pq_getmessage(&reply_message, maxmsglen))
1942  {
1944  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1945  errmsg("unexpected EOF on standby connection")));
1946  proc_exit(0);
1947  }
1948 
1949  /* ... and process it */
1950  switch (firstchar)
1951  {
1952  /*
1953  * 'd' means a standby reply wrapped in a CopyData packet.
1954  */
1955  case 'd':
1957  received = true;
1958  break;
1959 
1960  /*
1961  * CopyDone means the standby requested to finish streaming.
1962  * Reply with CopyDone, if we had not sent that already.
1963  */
1964  case 'c':
1965  if (!streamingDoneSending)
1966  {
1967  pq_putmessage_noblock('c', NULL, 0);
1968  streamingDoneSending = true;
1969  }
1970 
1971  streamingDoneReceiving = true;
1972  received = true;
1973  break;
1974 
1975  /*
1976  * 'X' means that the standby is closing down the socket.
1977  */
1978  case 'X':
1979  proc_exit(0);
1980 
1981  default:
1982  Assert(false); /* NOT REACHED */
1983  }
1984  }
1985 
1986  /*
1987  * Save the last reply timestamp if we've received at least one reply.
1988  */
1989  if (received)
1990  {
1992  waiting_for_ping_response = false;
1993  }
1994 }
#define COMMERROR
Definition: elog.h:29
#define FATAL
Definition: elog.h:37
void proc_exit(int code)
Definition: ipc.c:104
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1010
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1202
void pq_endmsgread(void)
Definition: pqcomm.c:1164
void pq_startmsgread(void)
Definition: pqcomm.c:1140
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_processing
Definition: walsender.c:163
static bool streamingDoneSending
Definition: walsender.c:180
static void ProcessStandbyMessage(void)
Definition: walsender.c:2000
static bool streamingDoneReceiving
Definition: walsender.c:181

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2248 of file walsender.c.

2249 {
2250  TransactionId feedbackXmin;
2251  uint32 feedbackEpoch;
2252  TransactionId feedbackCatalogXmin;
2253  uint32 feedbackCatalogEpoch;
2254  TimestampTz replyTime;
2255 
2256  /*
2257  * Decipher the reply message. The caller already consumed the msgtype
2258  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2259  * of this message.
2260  */
2261  replyTime = pq_getmsgint64(&reply_message);
2262  feedbackXmin = pq_getmsgint(&reply_message, 4);
2263  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2264  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2265  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2266 
2268  {
2269  char *replyTimeStr;
2270 
2271  /* Copy because timestamptz_to_str returns a static buffer */
2272  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2273 
2274  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2275  feedbackXmin,
2276  feedbackEpoch,
2277  feedbackCatalogXmin,
2278  feedbackCatalogEpoch,
2279  replyTimeStr);
2280 
2281  pfree(replyTimeStr);
2282  }
2283 
2284  /*
2285  * Update shared state for this WalSender process based on reply data from
2286  * standby.
2287  */
2288  {
2289  WalSnd *walsnd = MyWalSnd;
2290 
2291  SpinLockAcquire(&walsnd->mutex);
2292  walsnd->replyTime = replyTime;
2293  SpinLockRelease(&walsnd->mutex);
2294  }
2295 
2296  /*
2297  * Unset WalSender's xmins if the feedback message values are invalid.
2298  * This happens when the downstream turned hot_standby_feedback off.
2299  */
2300  if (!TransactionIdIsNormal(feedbackXmin)
2301  && !TransactionIdIsNormal(feedbackCatalogXmin))
2302  {
2304  if (MyReplicationSlot != NULL)
2305  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2306  return;
2307  }
2308 
2309  /*
2310  * Check that the provided xmin/epoch are sane, that is, not in the future
2311  * and not so far back as to be already wrapped around. Ignore if not.
2312  */
2313  if (TransactionIdIsNormal(feedbackXmin) &&
2314  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2315  return;
2316 
2317  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2318  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2319  return;
2320 
2321  /*
2322  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2323  * the xmin will be taken into account by GetSnapshotData() /
2324  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2325  * thereby prevent the generation of cleanup conflicts on the standby
2326  * server.
2327  *
2328  * There is a small window for a race condition here: although we just
2329  * checked that feedbackXmin precedes nextXid, the nextXid could have
2330  * gotten advanced between our fetching it and applying the xmin below,
2331  * perhaps far enough to make feedbackXmin wrap around. In that case the
2332  * xmin we set here would be "in the future" and have no effect. No point
2333  * in worrying about this since it's too late to save the desired data
2334  * anyway. Assuming that the standby sends us an increasing sequence of
2335  * xmins, this could only happen during the first reply cycle, else our
2336  * own xmin would prevent nextXid from advancing so far.
2337  *
2338  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2339  * is assumed atomic, and there's no real need to prevent concurrent
2340  * horizon determinations. (If we're moving our xmin forward, this is
2341  * obviously safe, and if we're moving it backwards, well, the data is at
2342  * risk already since a VACUUM could already have determined the horizon.)
2343  *
2344  * If we're using a replication slot we reserve the xmin via that,
2345  * otherwise via the walsender's PGPROC entry. We can only track the
2346  * catalog xmin separately when using a slot, so we store the least of the
2347  * two provided when not using a slot.
2348  *
2349  * XXX: It might make sense to generalize the ephemeral slot concept and
2350  * always use the slot mechanism to handle the feedback xmin.
2351  */
2352  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2353  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2354  else
2355  {
2356  if (TransactionIdIsNormal(feedbackCatalogXmin)
2357  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2358  MyProc->xmin = feedbackCatalogXmin;
2359  else
2360  MyProc->xmin = feedbackXmin;
2361  }
2362 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1782
unsigned int uint32
Definition: c.h:442
uint32 TransactionId
Definition: c.h:588
bool message_level_is_interesting(int elevel)
Definition: elog.c:269
#define DEBUG2
Definition: elog.h:25
char * pstrdup(const char *in)
Definition: mcxt.c:1483
void pfree(void *pointer)
Definition: mcxt.c:1306
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2168
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2217

References DEBUG2, elog(), InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2000 of file walsender.c.

2001 {
2002  char msgtype;
2003 
2004  /*
2005  * Check message type from the first byte.
2006  */
2007  msgtype = pq_getmsgbyte(&reply_message);
2008 
2009  switch (msgtype)
2010  {
2011  case 'r':
2013  break;
2014 
2015  case 'h':
2017  break;
2018 
2019  default:
2021  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2022  errmsg("unexpected message type \"%c\"", msgtype)));
2023  proc_exit(0);
2024  }
2025 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2248
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2063

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2063 of file walsender.c.

2064 {
2065  XLogRecPtr writePtr,
2066  flushPtr,
2067  applyPtr;
2068  bool replyRequested;
2069  TimeOffset writeLag,
2070  flushLag,
2071  applyLag;
2072  bool clearLagTimes;
2073  TimestampTz now;
2074  TimestampTz replyTime;
2075 
2076  static bool fullyAppliedLastTime = false;
2077 
2078  /* the caller already consumed the msgtype byte */
2079  writePtr = pq_getmsgint64(&reply_message);
2080  flushPtr = pq_getmsgint64(&reply_message);
2081  applyPtr = pq_getmsgint64(&reply_message);
2082  replyTime = pq_getmsgint64(&reply_message);
2083  replyRequested = pq_getmsgbyte(&reply_message);
2084 
2086  {
2087  char *replyTimeStr;
2088 
2089  /* Copy because timestamptz_to_str returns a static buffer */
2090  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2091 
2092  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2093  LSN_FORMAT_ARGS(writePtr),
2094  LSN_FORMAT_ARGS(flushPtr),
2095  LSN_FORMAT_ARGS(applyPtr),
2096  replyRequested ? " (reply requested)" : "",
2097  replyTimeStr);
2098 
2099  pfree(replyTimeStr);
2100  }
2101 
2102  /* See if we can compute the round-trip lag for these positions. */
2104  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2105  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2106  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2107 
2108  /*
2109  * If the standby reports that it has fully replayed the WAL in two
2110  * consecutive reply messages, then the second such message must result
2111  * from wal_receiver_status_interval expiring on the standby. This is a
2112  * convenient time to forget the lag times measured when it last
2113  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2114  * until more WAL traffic arrives.
2115  */
2116  clearLagTimes = false;
2117  if (applyPtr == sentPtr)
2118  {
2119  if (fullyAppliedLastTime)
2120  clearLagTimes = true;
2121  fullyAppliedLastTime = true;
2122  }
2123  else
2124  fullyAppliedLastTime = false;
2125 
2126  /* Send a reply if the standby requested one. */
2127  if (replyRequested)
2129 
2130  /*
2131  * Update shared state for this WalSender process based on reply data from
2132  * standby.
2133  */
2134  {
2135  WalSnd *walsnd = MyWalSnd;
2136 
2137  SpinLockAcquire(&walsnd->mutex);
2138  walsnd->write = writePtr;
2139  walsnd->flush = flushPtr;
2140  walsnd->apply = applyPtr;
2141  if (writeLag != -1 || clearLagTimes)
2142  walsnd->writeLag = writeLag;
2143  if (flushLag != -1 || clearLagTimes)
2144  walsnd->flushLag = flushLag;
2145  if (applyLag != -1 || clearLagTimes)
2146  walsnd->applyLag = applyLag;
2147  walsnd->replyTime = replyTime;
2148  SpinLockRelease(&walsnd->mutex);
2149  }
2150 
2153 
2154  /*
2155  * Advance our local xmin horizon when the client confirmed a flush.
2156  */
2157  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2158  {
2161  else
2163  }
2164 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1736
#define SlotIsLogical(slot)
Definition: slot.h:169
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:441
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2031
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3630
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3756

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog(), WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 478 of file walsender.c.

479 {
480 #define READ_REPLICATION_SLOT_COLS 3
481  ReplicationSlot *slot;
483  TupOutputState *tstate;
484  TupleDesc tupdesc;
486  bool nulls[READ_REPLICATION_SLOT_COLS];
487 
489  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
490  TEXTOID, -1, 0);
491  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492  TEXTOID, -1, 0);
493  /* TimeLineID is unsigned, so int4 is not wide enough. */
494  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495  INT8OID, -1, 0);
496 
497  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
498 
499  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
500  slot = SearchNamedReplicationSlot(cmd->slotname, false);
501  if (slot == NULL || !slot->in_use)
502  {
503  LWLockRelease(ReplicationSlotControlLock);
504  }
505  else
506  {
507  ReplicationSlot slot_contents;
508  int i = 0;
509 
510  /* Copy slot contents while holding spinlock */
511  SpinLockAcquire(&slot->mutex);
512  slot_contents = *slot;
513  SpinLockRelease(&slot->mutex);
514  LWLockRelease(ReplicationSlotControlLock);
515 
516  if (OidIsValid(slot_contents.data.database))
517  ereport(ERROR,
518  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519  errmsg("cannot use %s with a logical replication slot",
520  "READ_REPLICATION_SLOT"));
521 
522  /* slot type */
523  values[i] = CStringGetTextDatum("physical");
524  nulls[i] = false;
525  i++;
526 
527  /* start LSN */
528  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529  {
530  char xloc[64];
531 
532  snprintf(xloc, sizeof(xloc), "%X/%X",
533  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
534  values[i] = CStringGetTextDatum(xloc);
535  nulls[i] = false;
536  }
537  i++;
538 
539  /* timeline this WAL was produced on */
540  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541  {
542  TimeLineID slots_position_timeline;
543  TimeLineID current_timeline;
544  List *timeline_history = NIL;
545 
546  /*
547  * While in recovery, use as timeline the currently-replaying one
548  * to get the LSN position's history.
549  */
550  if (RecoveryInProgress())
551  (void) GetXLogReplayRecPtr(&current_timeline);
552  else
553  current_timeline = GetWALInsertionTimeLine();
554 
555  timeline_history = readTimeLineHistory(current_timeline);
556  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
557  timeline_history);
558  values[i] = Int64GetDatum((int64) slots_position_timeline);
559  nulls[i] = false;
560  }
561  i++;
562 
564  }
565 
567  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568  do_tup_output(tstate, values, nulls);
569  end_tup_output(tstate);
570 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:711
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1194
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1802
@ LW_SHARED
Definition: lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:374
Definition: pg_list.h:52
bool in_use
Definition: slot.h:123
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 577 of file walsender.c.

578 {
580  TupleDesc tupdesc;
582  char histfname[MAXFNAMELEN];
583  char path[MAXPGPATH];
584  int fd;
585  off_t histfilelen;
586  off_t bytesleft;
587  Size len;
588 
590 
591  /*
592  * Reply with a result set with one row, and two columns. The first col is
593  * the name of the history file, 2nd is the contents.
594  */
595  tupdesc = CreateTemplateTupleDesc(2);
596  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 
599  TLHistoryFileName(histfname, cmd->timeline);
600  TLHistoryFilePath(path, cmd->timeline);
601 
602  /* Send a RowDescription message */
603  dest->rStartup(dest, CMD_SELECT, tupdesc);
604 
605  /* Send a DataRow message */
606  pq_beginmessage(&buf, 'D');
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
#define PG_BINARY
Definition: c.h:1209
size_t Size
Definition: c.h:541
int errcode_for_file_access(void)
Definition: elog.c:718
int CloseTransientFile(int fd)
Definition: fd.c:2609
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2433
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:265
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
TimeLineID timeline
Definition: replnodes.h:108
char data[BLCKSZ]
Definition: c.h:1079
@ WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ
Definition: wait_event.h:223
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:268
static void pgstat_report_wait_end(void)
Definition: wait_event.h:284
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), TupleDescInitBuiltinEntry(), and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1244 of file walsender.c.

1245 {
1247  QueryCompletion qc;
1248 
1249  /* make sure that our requirements are still fulfilled */
1251 
1253 
1254  ReplicationSlotAcquire(cmd->slotname, true);
1255 
1257  ereport(ERROR,
1258  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1259  errmsg("cannot read from logical replication slot \"%s\"",
1260  cmd->slotname),
1261  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1262 
1263  /*
1264  * Force a disconnect, so that the decoding code doesn't need to care
1265  * about an eventual switch from running in recovery, to running in a
1266  * normal environment. Client code is expected to handle reconnects.
1267  */
1269  {
1270  ereport(LOG,
1271  (errmsg("terminating walsender process after promotion")));
1272  got_STOPPING = true;
1273  }
1274 
1275  /*
1276  * Create our decoding context, making it start at the previously ack'ed
1277  * position.
1278  *
1279  * Do this before sending a CopyBothResponse message, so that any errors
1280  * are reported early.
1281  */
1283  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1284  XL_ROUTINE(.page_read = logical_read_xlog_page,
1285  .segment_open = WalSndSegmentOpen,
1286  .segment_close = wal_segment_close),
1290 
1292 
1293  /* Send a CopyBothResponse message, and start streaming */
1294  pq_beginmessage(&buf, 'W');
1295  pq_sendbyte(&buf, 0);
1296  pq_sendint16(&buf, 0);
1297  pq_endmessage(&buf);
1298  pq_flush();
1299 
1300  /* Start reading WAL from the oldest required WAL. */
1303 
1304  /*
1305  * Report the location after which we'll send out further commits as the
1306  * current sentPtr.
1307  */
1309 
1310  /* Also update the sent position status in shared memory */
1314 
1315  replication_active = true;
1316 
1318 
1319  /* Main loop of walsender */
1321 
1324 
1325  replication_active = false;
1326  if (got_STOPPING)
1327  proc_exit(0);
1329 
1330  /* Get out of COPY mode (CommandComplete). */
1331  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1332  EndCommand(&qc, DestRemote, false);
1333 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
@ DestRemote
Definition: dest.h:91
int errdetail(const char *fmt,...)
Definition: elog.c:1039
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:450
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:85
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2443
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3025
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 670 of file walsender.c.

671 {
673  XLogRecPtr FlushPtr;
674  TimeLineID FlushTLI;
675 
676  /* create xlogreader for physical replication */
677  xlogreader =
679  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680  .segment_close = wal_segment_close),
681  NULL);
682 
683  if (!xlogreader)
684  ereport(ERROR,
685  (errcode(ERRCODE_OUT_OF_MEMORY),
686  errmsg("out of memory"),
687  errdetail("Failed while allocating a WAL reading processor.")));
688 
689  /*
690  * We assume here that we're logging enough information in the WAL for
691  * log-shipping, since this is checked in PostmasterMain().
692  *
693  * NOTE: wal_level can only change at shutdown, so in most cases it is
694  * difficult for there to be WAL data that we can still see that was
695  * written at wal_level='minimal'.
696  */
697 
698  if (cmd->slotname)
699  {
700  ReplicationSlotAcquire(cmd->slotname, true);
702  ereport(ERROR,
703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704  errmsg("cannot use a logical replication slot for physical replication")));
705 
706  /*
707  * We don't need to verify the slot's restart_lsn here; instead we
708  * rely on the caller requesting the starting point to use. If the
709  * WAL segment doesn't exist, we'll fail later.
710  */
711  }
712 
713  /*
714  * Select the timeline. If it was given explicitly by the client, use
715  * that. Otherwise use the timeline of the last replayed record.
716  */
719  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720  else
721  FlushPtr = GetFlushRecPtr(&FlushTLI);
722 
723  if (cmd->timeline != 0)
724  {
725  XLogRecPtr switchpoint;
726 
727  sendTimeLine = cmd->timeline;
728  if (sendTimeLine == FlushTLI)
729  {
730  sendTimeLineIsHistoric = false;
732  }
733  else
734  {
735  List *timeLineHistory;
736 
737  sendTimeLineIsHistoric = true;
738 
739  /*
740  * Check that the timeline the client requested exists, and the
741  * requested start location is on that timeline.
742  */
743  timeLineHistory = readTimeLineHistory(FlushTLI);
744  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
746  list_free_deep(timeLineHistory);
747 
748  /*
749  * Found the requested timeline in the history. Check that
750  * requested startpoint is on that timeline in our history.
751  *
752  * This is quite loose on purpose. We only check that we didn't
753  * fork off the requested timeline before the switchpoint. We
754  * don't check that we switched *to* it before the requested
755  * starting point. This is because the client can legitimately
756  * request to start replication from the beginning of the WAL
757  * segment that contains switchpoint, but on the new timeline, so
758  * that it doesn't end up with a partial segment. If you ask for
759  * too old a starting point, you'll get an error later when we
760  * fail to find the requested WAL segment in pg_wal.
761  *
762  * XXX: we could be more strict here and only allow a startpoint
763  * that's older than the switchpoint, if it's still in the same
764  * WAL segment.
765  */
766  if (!XLogRecPtrIsInvalid(switchpoint) &&
767  switchpoint < cmd->startpoint)
768  {
769  ereport(ERROR,
770  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
772  cmd->timeline),
773  errdetail("This server's history forked from timeline %u at %X/%X.",
774  cmd->timeline,
775  LSN_FORMAT_ARGS(switchpoint))));
776  }
777  sendTimeLineValidUpto = switchpoint;
778  }
779  }
780  else
781  {
782  sendTimeLine = FlushTLI;
784  sendTimeLineIsHistoric = false;
785  }
786 
788 
789  /* If there is nothing to stream, don't even enter COPY mode */
791  {
792  /*
793  * When we first start replication the standby will be behind the
794  * primary. For some applications, for example synchronous
795  * replication, it is important to have a clear state for this initial
796  * catchup mode, so we can trigger actions when we change streaming
797  * state later. We may stay in this state for a long time, which is
798  * exactly why we want to be able to monitor whether or not we are
799  * still here.
800  */
802 
803  /* Send a CopyBothResponse message, and start streaming */
804  pq_beginmessage(&buf, 'W');
805  pq_sendbyte(&buf, 0);
806  pq_sendint16(&buf, 0);
807  pq_endmessage(&buf);
808  pq_flush();
809 
810  /*
811  * Don't allow a request to stream from a future point in WAL that
812  * hasn't been flushed to disk in this server yet.
813  */
814  if (FlushPtr < cmd->startpoint)
815  {
816  ereport(ERROR,
817  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
819  LSN_FORMAT_ARGS(FlushPtr))));
820  }
821 
822  /* Start streaming from the requested point */
823  sentPtr = cmd->startpoint;
824 
825  /* Initialize shared memory status, too */
829 
831 
832  /* Main loop of walsender */
833  replication_active = true;
834 
836 
837  replication_active = false;
838  if (got_STOPPING)
839  proc_exit(0);
841 
843  }
844 
845  if (cmd->slotname)
847 
848  /*
849  * Copy is finished now. Send a single-row result set indicating the next
850  * timeline.
851  */
853  {
854  char startpos_str[8 + 1 + 8 + 1];
856  TupOutputState *tstate;
857  TupleDesc tupdesc;
858  Datum values[2];
859  bool nulls[2] = {0};
860 
861  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
863 
865 
866  /*
867  * Need a tuple descriptor representing two columns. int8 may seem
868  * like a surprising data type for this, but in theory int4 would not
869  * be wide enough for this, as TimeLineID is unsigned.
870  */
871  tupdesc = CreateTemplateTupleDesc(2);
872  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873  INT8OID, -1, 0);
874  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
875  TEXTOID, -1, 0);
876 
877  /* prepare for projection of tuple */
878  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 
881  values[1] = CStringGetTextDatum(startpos_str);
882 
883  /* send it to dest */
884  do_tup_output(tstate, values, nulls);
885 
886  end_tup_output(tstate);
887  }
888 
889  /* Send CommandComplete message */
890  EndReplicationCommand("START_STREAMING");
891 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
void list_free_deep(List *list)
Definition: list.c:1559
TimeLineID timeline
Definition: replnodes.h:84
static void XLogSendPhysical(void)
Definition: walsender.c:2725
int wal_segment_size
Definition: xlog.c:146
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2217 of file walsender.c.

2218 {
2219  FullTransactionId nextFullXid;
2220  TransactionId nextXid;
2221  uint32 nextEpoch;
2222 
2223  nextFullXid = ReadNextFullTransactionId();
2224  nextXid = XidFromFullTransactionId(nextFullXid);
2225  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2226 
2227  if (xid <= nextXid)
2228  {
2229  if (epoch != nextEpoch)
2230  return false;
2231  }
2232  else
2233  {
2234  if (epoch + 1 != nextEpoch)
2235  return false;
2236  }
2237 
2238  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2239  return false; /* epoch OK, but it's wrapped around */
2240 
2241  return true;
2242 }
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:292
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2416 of file walsender.c.

2417 {
2418  TimestampTz timeout;
2419 
2420  /* don't bail out if we're doing something that doesn't require timeouts */
2421  if (last_reply_timestamp <= 0)
2422  return;
2423 
2426 
2427  if (wal_sender_timeout > 0 && last_processing >= timeout)
2428  {
2429  /*
2430  * Since typically expiration of replication timeout means
2431  * communication problem, we don't send the error message to the
2432  * standby.
2433  */
2435  (errmsg("terminating walsender process due to replication timeout")));
2436 
2437  WalSndShutdown();
2438  }
2439 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:84
int wal_sender_timeout
Definition: walsender.c:124

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2372 of file walsender.c.

2373 {
2374  long sleeptime = 10000; /* 10 s */
2375 
2377  {
2378  TimestampTz wakeup_time;
2379 
2380  /*
2381  * At the latest stop sleeping once wal_sender_timeout has been
2382  * reached.
2383  */
2386 
2387  /*
2388  * If no ping has been sent yet, wakeup when it's time to do so.
2389  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2390  * the timeout passed without a response.
2391  */
2394  wal_sender_timeout / 2);
2395 
2396  /* Compute relative time until wakeup. */
2397  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2398  }
2399 
2400  return sleeptime;
2401 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1701

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3106 of file walsender.c.

3107 {
3108  XLogRecPtr replicatedPtr;
3109 
3110  /* ... let's just be real sure we're caught up ... */
3111  send_data();
3112 
3113  /*
3114  * To figure out whether all WAL has successfully been replicated, check
3115  * flush location if valid, write otherwise. Tools like pg_receivewal will
3116  * usually (unless in synchronous mode) return an invalid flush location.
3117  */
3118  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3120 
3121  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3122  !pq_is_send_pending())
3123  {
3124  QueryCompletion qc;
3125 
3126  /* Inform the standby that XLOG streaming is done */
3127  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3128  EndCommand(&qc, DestRemote, false);
3129  pq_flush();
3130 
3131  proc_exit(0);
3132  }
3135 }
static bool WalSndCaughtUp
Definition: walsender.c:184

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 315 of file walsender.c.

316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
void ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1901
void ReplicationSlotCleanup(void)
Definition: slot.c:603
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4814

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3424 of file walsender.c.

3425 {
3426  switch (state)
3427  {
3428  case WALSNDSTATE_STARTUP:
3429  return "startup";
3430  case WALSNDSTATE_BACKUP:
3431  return "backup";
3432  case WALSNDSTATE_CATCHUP:
3433  return "catchup";
3434  case WALSNDSTATE_STREAMING:
3435  return "streaming";
3436  case WALSNDSTATE_STOPPING:
3437  return "stopping";
3438  }
3439  return "UNKNOWN";
3440 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3341 of file walsender.c.

3342 {
3343  int i;
3344 
3345  for (i = 0; i < max_wal_senders; i++)
3346  {
3347  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3348  pid_t pid;
3349 
3350  SpinLockAcquire(&walsnd->mutex);
3351  pid = walsnd->pid;
3352  SpinLockRelease(&walsnd->mutex);
3353 
3354  if (pid == 0)
3355  continue;
3356 
3358  }
3359 }
#define InvalidBackendId
Definition: backendid.h:23
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 3630 of file walsender.c.

3631 {
3632  elog(DEBUG2, "sending replication keepalive");
3633 
3634  /* construct the message... */
3636  pq_sendbyte(&output_message, 'k');
3637  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3639  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3640 
3641  /* ... and send it wrapped in CopyData */
3643 
3644  /* Set local flag */
3645  if (requestReply)
3647 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153

References StringInfoData::data, DEBUG2, elog(), GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3653 of file walsender.c.

3654 {
3655  TimestampTz ping_time;
3656 
3657  /*
3658  * Don't send keepalive messages if timeouts are globally disabled or
3659  * we're doing something not partaking in timeouts.
3660  */
3661  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3662  return;
3663 
3665  return;
3666 
3667  /*
3668  * If half of wal_sender_timeout has lapsed without receiving any reply
3669  * from the standby, send a keep-alive message to the standby requesting
3670  * an immediate reply.
3671  */
3673  wal_sender_timeout / 2);
3674  if (last_processing >= ping_time)
3675  {
3677 
3678  /* Try to flush pending output to the client */
3679  if (pq_flush_if_writable() != 0)
3680  WalSndShutdown();
3681  }
3682 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2629 of file walsender.c.

2630 {
2631  WalSnd *walsnd = MyWalSnd;
2632 
2633  Assert(walsnd != NULL);
2634 
2635  MyWalSnd = NULL;
2636 
2637  SpinLockAcquire(&walsnd->mutex);
2638  /* clear latch while holding the spinlock, so it can safely be read */
2639  walsnd->latch = NULL;
2640  /* Mark WalSnd struct as no longer being in use. */
2641  walsnd->pid = 0;
2642  SpinLockRelease(&walsnd->mutex);
2643 }

References Assert(), WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3221 of file walsender.c.

3222 {
3223  int save_errno = errno;
3224 
3225  got_SIGUSR2 = true;
3226  SetLatch(MyLatch);
3227 
3228  errno = save_errno;
3229 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2443 of file walsender.c.

2444 {
2445  /*
2446  * Initialize the last reply timestamp. That enables timeout processing
2447  * from hereon.
2448  */
2450  waiting_for_ping_response = false;
2451 
2452  /*
2453  * Loop until we reach the end of this timeline or the client requests to
2454  * stop streaming.
2455  */
2456  for (;;)
2457  {
2458  /* Clear any already-pending wakeups */
2460 
2462 
2463  /* Process any requests or signals received recently */
2464  if (ConfigReloadPending)
2465  {
2466  ConfigReloadPending = false;
2469  }
2470 
2471  /* Check for input from the client */
2473 
2474  /*
2475  * If we have received CopyDone from the client, sent CopyDone
2476  * ourselves, and the output buffer is empty, it's time to exit
2477  * streaming.
2478  */
2480  !pq_is_send_pending())
2481  break;
2482 
2483  /*
2484  * If we don't have any pending data in the output buffer, try to send
2485  * some more. If there is some, we don't bother to call send_data
2486  * again until we've flushed it ... but we'd better assume we are not
2487  * caught up.
2488  */
2489  if (!pq_is_send_pending())
2490  send_data();
2491  else
2492  WalSndCaughtUp = false;
2493 
2494  /* Try to flush pending output to the client */
2495  if (pq_flush_if_writable() != 0)
2496  WalSndShutdown();
2497 
2498  /* If nothing remains to be sent right now ... */
2500  {
2501  /*
2502  * If we're in catchup state, move to streaming. This is an
2503  * important state change for users to know about, since before
2504  * this point data loss might occur if the primary dies and we
2505  * need to failover to the standby. The state change is also
2506  * important for synchronous replication, since commits that
2507  * started to wait at that point might wait for some time.
2508  */
2510  {
2511  ereport(DEBUG1,
2512  (errmsg_internal("\"%s\" has now caught up with upstream server",
2513  application_name)));
2515  }
2516 
2517  /*
2518  * When SIGUSR2 arrives, we send any outstanding logs up to the
2519  * shutdown checkpoint record (i.e., the latest record), wait for
2520  * them to be replicated to the standby, and exit. This may be a
2521  * normal termination at shutdown, or a promotion, the walsender
2522  * is not sure which.
2523  */
2524  if (got_SIGUSR2)
2525  WalSndDone(send_data);
2526  }
2527 
2528  /* Check for replication timeout. */
2530 
2531  /* Send keepalive if the time has come */
2533 
2534  /*
2535  * Block if we have unsent data. XXX For logical replication, let
2536  * WalSndWaitForWal() handle any other blocking; idle receivers need
2537  * its additional actions. For physical replication, also block if
2538  * caught up; its send_data does not block.
2539  */
2540  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2543  {
2544  long sleeptime;
2545  int wakeEvents;
2546 
2548  wakeEvents = WL_SOCKET_READABLE;
2549  else
2550  wakeEvents = 0;
2551 
2552  /*
2553  * Use fresh timestamp, not last_processing, to reduce the chance
2554  * of reaching wal_sender_timeout before sending a keepalive.
2555  */
2557 
2558  if (pq_is_send_pending())
2559  wakeEvents |= WL_SOCKET_WRITEABLE;
2560 
2561  /* Sleep until something happens or we time out */
2562  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2563  }
2564  }
2565 }
char * application_name
Definition: guc_tables.c:510
@ WAIT_EVENT_WAL_SENDER_MAIN
Definition: wait_event.h:48
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3106

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1344 of file walsender.c.

1345 {
1346  /* can't have sync rep confused by sending the same LSN several times */
1347  if (!last_write)
1348  lsn = InvalidXLogRecPtr;
1349 
1350  resetStringInfo(ctx->out);
1351 
1352  pq_sendbyte(ctx->out, 'w');
1353  pq_sendint64(ctx->out, lsn); /* dataStart */
1354  pq_sendint64(ctx->out, lsn); /* walEnd */
1355 
1356  /*
1357  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1358  * reserve space here.
1359  */
1360  pq_sendint64(ctx->out, 0); /* sendtime */
1361 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 350 of file walsender.c.

351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:742
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3176 of file walsender.c.

3177 {
3178  int i;
3179 
3180  for (i = 0; i < max_wal_senders; i++)
3181  {
3182  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3183 
3184  SpinLockAcquire(&walsnd->mutex);
3185  if (walsnd->pid == 0)
3186  {
3187  SpinLockRelease(&walsnd->mutex);
3188  continue;
3189  }
3190  walsnd->needreload = true;
3191  SpinLockRelease(&walsnd->mutex);
3192  }
3193 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2647 of file walsender.c.

2649 {
2650  char path[MAXPGPATH];
2651 
2652  /*-------
2653  * When reading from a historic timeline, and there is a timeline switch
2654  * within this segment, read from the WAL segment belonging to the new
2655  * timeline.
2656  *
2657  * For example, imagine that this server is currently on timeline 5, and
2658  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2659  * 0/13002088. In pg_wal, we have these files:
2660  *
2661  * ...
2662  * 000000040000000000000012
2663  * 000000040000000000000013
2664  * 000000050000000000000013
2665  * 000000050000000000000014
2666  * ...
2667  *
2668  * In this situation, when requested to send the WAL from segment 0x13, on
2669  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2670  * recovery prefers files from newer timelines, so if the segment was
2671  * restored from the archive on this server, the file belonging to the old
2672  * timeline, 000000040000000000000013, might not exist. Their contents are
2673  * equal up to the switchpoint, because at a timeline switch, the used
2674  * portion of the old segment is copied to the new file. -------
2675  */
2676  *tli_p = sendTimeLine;
2678  {
2679  XLogSegNo endSegNo;
2680 
2681  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2682  if (nextSegNo == endSegNo)
2683  *tli_p = sendTimeLineNextTLI;
2684  }
2685 
2686  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2687  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2688  if (state->seg.ws_file >= 0)
2689  return;
2690 
2691  /*
2692  * If the file is not found, assume it's because the standby asked for a
2693  * too old WAL segment that has already been removed or recycled.
2694  */
2695  if (errno == ENOENT)
2696  {
2697  char xlogfname[MAXFNAMELEN];
2698  int save_errno = errno;
2699 
2700  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2701  errno = save_errno;
2702  ereport(ERROR,
2704  errmsg("requested WAL segment %s has already been removed",
2705  xlogfname)));
2706  }
2707  else
2708  ereport(ERROR,
2710  errmsg("could not open file \"%s\": %m",
2711  path)));
2712 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:993
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3405 of file walsender.c.

3406 {
3407  WalSnd *walsnd = MyWalSnd;
3408 
3410 
3411  if (walsnd->state == state)
3412  return;
3413 
3414  SpinLockAcquire(&walsnd->mutex);
3415  walsnd->state = state;
3416  SpinLockRelease(&walsnd->mutex);
3417 }

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3264 of file walsender.c.

3265 {
3266  bool found;
3267  int i;
3268 
3269  WalSndCtl = (WalSndCtlData *)
3270  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3271 
3272  if (!found)
3273  {
3274  /* First time through, so initialize */
3276 
3277  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3279 
3280  for (i = 0; i < max_wal_senders; i++)
3281  {
3282  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3283 
3284  SpinLockInit(&walsnd->mutex);
3285  }
3286  }
3287 }
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
#define SpinLockInit(lock)
Definition: spin.h:60
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
Size WalSndShmemSize(void)
Definition: walsender.c:3252

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3252 of file walsender.c.

3253 {
3254  Size size = 0;
3255 
3256  size = offsetof(WalSndCtlData, walsnds);
3257  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3258 
3259  return size;
3260 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 230 of file walsender.c.

267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
@ LW_EXCLUSIVE
Definition: lwlock.h:112
MemoryContext TopMemoryContext
Definition: mcxt.c:130
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1037
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
PROC_HDR * ProcGlobal
Definition: proc.c:80
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:377
static void InitWalSenderSlot(void)
Definition: walsender.c:2569

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3233 of file walsender.c.

3234 {
3235  /* Set up signal handlers */
3237  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3238  pqsignal(SIGTERM, die); /* request shutdown */
3239  /* SIGQUIT handler was already set up by InitPostmasterChild */
3240  InitializeTimeouts(); /* establishes SIGALRM handler */
3243  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3244  * shutdown */
3245 
3246  /* Reset some signals that are accepted by postmaster but not here */
3248 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2951
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
void InitializeTimeouts(void)
Definition: timeout.c:474
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3221
#define SIGCHLD
Definition: win32_port.h:186
#define SIGHUP
Definition: win32_port.h:176
#define SIG_DFL
Definition: win32_port.h:171
#define SIGPIPE
Definition: win32_port.h:181
#define SIGUSR1
Definition: win32_port.h:188
#define SIGUSR2
Definition: win32_port.h:189
#define SIG_IGN
Definition: win32_port.h:173

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1467 of file walsender.c.

1469 {
1470  static TimestampTz sendTime = 0;
1472  bool pending_writes = false;
1473  bool end_xact = ctx->end_xact;
1474 
1475  /*
1476  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1477  * avoid flooding the lag tracker when we commit frequently.
1478  *
1479  * We don't have a mechanism to get the ack for any LSN other than end
1480  * xact LSN from the downstream. So, we track lag only for end of
1481  * transaction LSN.
1482  */
1483 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1484  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1486  {
1487  LagTrackerWrite(lsn, now);
1488  sendTime = now;
1489  }
1490 
1491  /*
1492  * When skipping empty transactions in synchronous replication, we send a
1493  * keepalive message to avoid delaying such transactions.
1494  *
1495  * It is okay to check sync_standbys_defined flag without lock here as in
1496  * the worst case we will just send an extra keepalive message when it is
1497  * really not required.
1498  */
1499  if (skipped_xact &&
1500  SyncRepRequested() &&
1501  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1502  {
1503  WalSndKeepalive(false, lsn);
1504 
1505  /* Try to flush pending output to the client */
1506  if (pq_flush_if_writable() != 0)
1507  WalSndShutdown();
1508 
1509  /* If we have pending write here, make sure it's actually flushed */
1510  if (pq_is_send_pending())
1511  pending_writes = true;
1512  }
1513 
1514  /*
1515  * Process pending writes if any or try to send a keepalive if required.
1516  * We don't need to try sending keep alive messages at the transaction end
1517  * as that will be done at a later point in time. This is required only
1518  * for large transactions where we don't send any changes to the
1519  * downstream and the receiver can timeout due to that.
1520  */
1521  if (pending_writes || (!end_xact &&
1523  wal_sender_timeout / 2)))
1525 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1413
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3691

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3324 of file walsender.c.

3325 {
3326  WaitEvent event;
3327 
3328  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3329  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3330  (event.events & WL_POSTMASTER_DEATH))
3331  proc_exit(1);
3332 }
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:972
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1345
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
uint32 events
Definition: latch.h:146

References WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, ModifyWaitEvent(), proc_exit(), WaitEventSetWait(), and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1535 of file walsender.c.

1536 {
1537  int wakeEvents;
1538  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1539 
1540  /*
1541  * Fast path to avoid acquiring the spinlock in case we already know we
1542  * have enough WAL available. This is particularly interesting if we're
1543  * far behind.
1544  */
1545  if (RecentFlushPtr != InvalidXLogRecPtr &&
1546  loc <= RecentFlushPtr)
1547  return RecentFlushPtr;
1548 
1549  /* Get a more recent flush pointer. */
1550  if (!RecoveryInProgress())
1551  RecentFlushPtr = GetFlushRecPtr(NULL);
1552  else
1553  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1554 
1555  for (;;)
1556  {
1557  long sleeptime;
1558 
1559  /* Clear any already-pending wakeups */
1561 
1563 
1564  /* Process any requests or signals received recently */
1565  if (ConfigReloadPending)
1566  {
1567  ConfigReloadPending = false;
1570  }
1571 
1572  /* Check for input from the client */
1574 
1575  /*
1576  * If we're shutting down, trigger pending WAL to be written out,
1577  * otherwise we'd possibly end up waiting for WAL that never gets
1578  * written, because walwriter has shut down already.
1579  */
1580  if (got_STOPPING)
1582 
1583  /* Update our idea of the currently flushed position. */
1584  if (!RecoveryInProgress())
1585  RecentFlushPtr = GetFlushRecPtr(NULL);
1586  else
1587  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1588 
1589  /*
1590  * If postmaster asked us to stop, don't wait anymore.
1591  *
1592  * It's important to do this check after the recomputation of
1593  * RecentFlushPtr, so we can send all remaining data before shutting
1594  * down.
1595  */
1596  if (got_STOPPING)
1597  break;
1598 
1599  /*
1600  * We only send regular messages to the client for full decoded
1601  * transactions, but a synchronous replication and walsender shutdown
1602  * possibly are waiting for a later location. So, before sleeping, we
1603  * send a ping containing the flush location. If the receiver is
1604  * otherwise idle, this keepalive will trigger a reply. Processing the
1605  * reply will update these MyWalSnd locations.
1606  */
1607  if (MyWalSnd->flush < sentPtr &&
1608  MyWalSnd->write < sentPtr &&
1611 
1612  /* check whether we're done */
1613  if (loc <= RecentFlushPtr)
1614  break;
1615 
1616  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1617  WalSndCaughtUp = true;
1618 
1619  /*
1620  * Try to flush any pending output to the client.
1621  */
1622  if (pq_flush_if_writable() != 0)
1623  WalSndShutdown();
1624 
1625  /*
1626  * If we have received CopyDone from the client, sent CopyDone
1627  * ourselves, and the output buffer is empty, it's time to exit
1628  * streaming, so fail the current WAL fetch request.
1629  */
1631  !pq_is_send_pending())
1632  break;
1633 
1634  /* die if timeout was reached */
1636 
1637  /* Send keepalive if the time has come */
1639 
1640  /*
1641  * Sleep until something happens or we time out. Also wait for the
1642  * socket becoming writable, if there's still pending output.
1643  * Otherwise we might sit on sendable output data while waiting for
1644  * new WAL to be generated. (But if we have nothing to send, we don't
1645  * want to wake on socket-writable.)
1646  */
1648 
1649  wakeEvents = WL_SOCKET_READABLE;
1650 
1651  if (pq_is_send_pending())
1652  wakeEvents |= WL_SOCKET_WRITEABLE;
1653 
1654  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1655  }
1656 
1657  /* reactivate latch so WalSndLoop knows to continue */
1658  SetLatch(MyLatch);
1659  return RecentFlushPtr;
1660 }
@ WAIT_EVENT_WAL_SENDER_WAIT_WAL
Definition: wait_event.h:68
bool XLogBackgroundFlush(void)
Definition: xlog.c:2702

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3367 of file walsender.c.

3368 {
3369  for (;;)
3370  {
3371  int i;
3372  bool all_stopped = true;
3373 
3374  for (i = 0; i < max_wal_senders; i++)
3375  {
3376  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3377 
3378  SpinLockAcquire(&walsnd->mutex);
3379 
3380  if (walsnd->pid == 0)
3381  {
3382  SpinLockRelease(&walsnd->mutex);
3383  continue;
3384  }
3385 
3386  if (walsnd->state != WALSNDSTATE_STOPPING)
3387  {
3388  all_stopped = false;
3389  SpinLockRelease(&walsnd->mutex);
3390  break;
3391  }
3392  SpinLockRelease(&walsnd->mutex);
3393  }
3394 
3395  /* safe to leave if confirmation is done for all WAL senders */
3396  if (all_stopped)
3397  return;
3398 
3399  pg_usleep(10000L); /* wait for 10 msec */
3400  }
3401 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3296 of file walsender.c.

3297 {
3298  int i;
3299 
3300  for (i = 0; i < max_wal_senders; i++)
3301  {
3302  Latch *latch;
3303  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3304 
3305  /*
3306  * Get latch pointer with spinlock held, for the unlikely case that
3307  * pointer reads aren't atomic (as they're 8 bytes).
3308  */
3309  SpinLockAcquire(&walsnd->mutex);
3310  latch = walsnd->latch;
3311  SpinLockRelease(&walsnd->mutex);
3312 
3313  if (latch != NULL)
3314  SetLatch(latch);
3315  }
3316 }
Definition: latch.h:111

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1371 of file walsender.c.

1373 {
1374  TimestampTz now;
1375 
1376  /*
1377  * Fill the send timestamp last, so that it is taken as late as possible.
1378  * This is somewhat ugly, but the protocol is set as it's already used for
1379  * several releases by streaming physical replication.
1380  */
1383  pq_sendint64(&tmpbuf, now);
1384  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1385  tmpbuf.data, sizeof(int64));
1386 
1387  /* output previously gathered data in a CopyData packet */
1388  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1389 
1391 
1392  /* Try to flush pending output to the client */
1393  if (pq_flush_if_writable() != 0)
1394  WalSndShutdown();
1395 
1396  /* Try taking fast path unless we get too close to walsender timeout. */
1398  wal_sender_timeout / 2) &&
1399  !pq_is_send_pending())
1400  {
1401  return;
1402  }
1403 
1404  /* If we have pending write here, go to slow path */
1406 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3025 of file walsender.c.

3026 {
3027  XLogRecord *record;
3028  char *errm;
3029 
3030  /*
3031  * We'll use the current flush point to determine whether we've caught up.
3032  * This variable is static in order to cache it across calls. Caching is
3033  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3034  * spinlock.
3035  */
3036  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3037 
3038  /*
3039  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3040  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3041  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3042  * didn't wait - i.e. when we're shutting down.
3043  */
3044  WalSndCaughtUp = false;
3045 
3046  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3047 
3048  /* xlog record was invalid */
3049  if (errm != NULL)
3050  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3051  errm);
3052 
3053  if (record != NULL)
3054  {
3055  /*
3056  * Note the lack of any call to LagTrackerWrite() which is handled by
3057  * WalSndUpdateProgress which is called by output plugin through
3058  * logical decoding write api.
3059  */
3061 
3063  }
3064 
3065  /*
3066  * If first time through in this session, initialize flushPtr. Otherwise,
3067  * we only need to update flushPtr if EndRecPtr is past it.
3068  */
3069  if (flushPtr == InvalidXLogRecPtr)
3070  flushPtr = GetFlushRecPtr(NULL);
3071  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3072  flushPtr = GetFlushRecPtr(NULL);
3073 
3074  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3075  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3076  WalSndCaughtUp = true;
3077 
3078  /*
3079  * If we're caught up and have been requested to stop, have WalSndLoop()
3080  * terminate the connection in an orderly manner, after writing out all
3081  * the pending data.
3082  */
3084  got_SIGUSR2 = true;
3085 
3086  /* Update shared memory status */
3087  {
3088  WalSnd *walsnd = MyWalSnd;
3089 
3090  SpinLockAcquire(&walsnd->mutex);
3091  walsnd->sentPtr = sentPtr;
3092  SpinLockRelease(&walsnd->mutex);
3093  }
3094 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:422

References elog(), XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2725 of file walsender.c.

2726 {
2727  XLogRecPtr SendRqstPtr;
2728  XLogRecPtr startptr;
2729  XLogRecPtr endptr;
2730  Size nbytes;
2731  XLogSegNo segno;
2732  WALReadError errinfo;
2733 
2734  /* If requested switch the WAL sender to the stopping state. */
2735  if (got_STOPPING)
2737 
2739  {
2740  WalSndCaughtUp = true;
2741  return;
2742  }
2743 
2744  /* Figure out how far we can safely send the WAL. */
2746  {
2747  /*
2748  * Streaming an old timeline that's in this server's history, but is
2749  * not the one we're currently inserting or replaying. It can be
2750  * streamed up to the point where we switched off that timeline.
2751  */
2752  SendRqstPtr = sendTimeLineValidUpto;
2753  }
2754  else if (am_cascading_walsender)
2755  {
2756  TimeLineID SendRqstTLI;
2757 
2758  /*
2759  * Streaming the latest timeline on a standby.
2760  *
2761  * Attempt to send all WAL that has already been replayed, so that we
2762  * know it's valid. If we're receiving WAL through streaming
2763  * replication, it's also OK to send any WAL that has been received
2764  * but not replayed.
2765  *
2766  * The timeline we're recovering from can change, or we can be
2767  * promoted. In either case, the current timeline becomes historic. We
2768  * need to detect that so that we don't try to stream past the point
2769  * where we switched to another timeline. We check for promotion or
2770  * timeline switch after calculating FlushPtr, to avoid a race
2771  * condition: if the timeline becomes historic just after we checked
2772  * that it was still current, it's still be OK to stream it up to the
2773  * FlushPtr that was calculated before it became historic.
2774  */
2775  bool becameHistoric = false;
2776 
2777  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2778 
2779  if (!RecoveryInProgress())
2780  {
2781  /* We have been promoted. */
2782  SendRqstTLI = GetWALInsertionTimeLine();
2783  am_cascading_walsender = false;
2784  becameHistoric = true;
2785  }
2786  else
2787  {
2788  /*
2789  * Still a cascading standby. But is the timeline we're sending
2790  * still the one recovery is recovering from?
2791  */
2792  if (sendTimeLine != SendRqstTLI)
2793  becameHistoric = true;
2794  }
2795 
2796  if (becameHistoric)
2797  {
2798  /*
2799  * The timeline we were sending has become historic. Read the
2800  * timeline history file of the new timeline to see where exactly
2801  * we forked off from the timeline we were sending.
2802  */
2803  List *history;
2804 
2805  history = readTimeLineHistory(SendRqstTLI);
2807 
2809  list_free_deep(history);
2810 
2811  sendTimeLineIsHistoric = true;
2812 
2813  SendRqstPtr = sendTimeLineValidUpto;
2814  }
2815  }
2816  else
2817  {
2818  /*
2819  * Streaming the current timeline on a primary.
2820  *
2821  * Attempt to send all data that's already been written out and
2822  * fsync'd to disk. We cannot go further than what's been written out
2823  * given the current implementation of WALRead(). And in any case
2824  * it's unsafe to send WAL that is not securely down to disk on the
2825  * primary: if the primary subsequently crashes and restarts, standbys
2826  * must not have applied any WAL that got lost on the primary.
2827  */
2828  SendRqstPtr = GetFlushRecPtr(NULL);
2829  }
2830 
2831  /*
2832  * Record the current system time as an approximation of the time at which
2833  * this WAL location was written for the purposes of lag tracking.
2834  *
2835  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2836  * is flushed and we could get that time as well as the LSN when we call
2837  * GetFlushRecPtr() above (and likewise for the cascading standby
2838  * equivalent), but rather than putting any new code into the hot WAL path
2839  * it seems good enough to capture the time here. We should reach this
2840  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2841  * may take some time, we read the WAL flush pointer and take the time
2842  * very close to together here so that we'll get a later position if it is
2843  * still moving.
2844  *
2845  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2846  * this gives us a cheap approximation for the WAL flush time for this
2847  * LSN.
2848  *
2849  * Note that the LSN is not necessarily the LSN for the data contained in
2850  * the present message; it's the end of the WAL, which might be further
2851  * ahead. All the lag tracking machinery cares about is finding out when
2852  * that arbitrary LSN is eventually reported as written, flushed and
2853  * applied, so that it can measure the elapsed time.
2854  */
2855  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2856 
2857  /*
2858  * If this is a historic timeline and we've reached the point where we
2859  * forked to the next timeline, stop streaming.
2860  *
2861  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2862  * startup process will normally replay all WAL that has been received
2863  * from the primary, before promoting, but if the WAL streaming is
2864  * terminated at a WAL page boundary, the valid portion of the timeline
2865  * might end in the middle of a WAL record. We might've already sent the
2866  * first half of that partial WAL record to the cascading standby, so that
2867  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2868  * replay the partial WAL record either, so it can still follow our
2869  * timeline switch.
2870  */
2872  {
2873  /* close the current file. */
2874  if (xlogreader->seg.ws_file >= 0)
2876 
2877  /* Send CopyDone */
2878  pq_putmessage_noblock('c', NULL, 0);
2879  streamingDoneSending = true;
2880 
2881  WalSndCaughtUp = true;
2882 
2883  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2886  return;
2887  }
2888 
2889  /* Do we have any work to do? */
2890  Assert(sentPtr <= SendRqstPtr);
2891  if (SendRqstPtr <= sentPtr)
2892  {
2893  WalSndCaughtUp = true;
2894  return;
2895  }
2896 
2897  /*
2898  * Figure out how much to send in one message. If there's no more than
2899  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2900  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2901  *
2902  * The rounding is not only for performance reasons. Walreceiver relies on
2903  * the fact that we never split a WAL record across two messages. Since a
2904  * long WAL record is split at page boundary into continuation records,
2905  * page boundary is always a safe cut-off point. We also assume that
2906  * SendRqstPtr never points to the middle of a WAL record.
2907  */
2908  startptr = sentPtr;
2909  endptr = startptr;
2910  endptr += MAX_SEND_SIZE;
2911 
2912  /* if we went beyond SendRqstPtr, back off */
2913  if (SendRqstPtr <= endptr)
2914  {
2915  endptr = SendRqstPtr;
2917  WalSndCaughtUp = false;
2918  else
2919  WalSndCaughtUp = true;
2920  }
2921  else
2922  {
2923  /* round down to page boundary. */
2924  endptr -= (endptr % XLOG_BLCKSZ);
2925  WalSndCaughtUp = false;
2926  }
2927 
2928  nbytes = endptr - startptr;
2929  Assert(nbytes <= MAX_SEND_SIZE);
2930 
2931  /*
2932  * OK to read and send the slice.
2933  */
2935  pq_sendbyte(&output_message, 'w');
2936 
2937  pq_sendint64(&output_message, startptr); /* dataStart */
2938  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2939  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2940 
2941  /*
2942  * Read the log directly into the output buffer to avoid extra memcpy
2943  * calls.
2944  */
2946 
2947 retry:
2948  if (!WALRead(xlogreader,
2950  startptr,
2951  nbytes,
2952  xlogreader->seg.ws_tli, /* Pass the current TLI because
2953  * only WalSndSegmentOpen controls
2954  * whether new TLI is needed. */
2955  &errinfo))
2956  WALReadRaiseError(&errinfo);
2957 
2958  /* See logical_read_xlog_page(). */
2959  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2961 
2962  /*
2963  * During recovery, the currently-open WAL file might be replaced with the
2964  * file of the same name retrieved from archive. So we always need to
2965  * check what we read was valid after reading into the buffer. If it's
2966  * invalid, we try to open and read the file again.
2967  */
2969  {
2970  WalSnd *walsnd = MyWalSnd;
2971  bool reload;
2972 
2973  SpinLockAcquire(&walsnd->mutex);
2974  reload = walsnd->needreload;
2975  walsnd->needreload = false;
2976  SpinLockRelease(&walsnd->mutex);
2977 
2978  if (reload && xlogreader->seg.ws_file >= 0)
2979  {
2981 
2982  goto retry;
2983  }
2984  }
2985 
2986  output_message.len += nbytes;
2988 
2989  /*
2990  * Fill the send timestamp last, so that it is taken as late as possible.
2991  */
2994  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2995  tmpbuf.data, sizeof(int64));
2996 
2998 
2999  sentPtr = endptr;
3000 
3001  /* Update shared memory status */
3002  {
3003  WalSnd *walsnd = MyWalSnd;
3004 
3005  SpinLockAcquire(&walsnd->mutex);
3006  walsnd->sentPtr = sentPtr;
3007  SpinLockRelease(&walsnd->mutex);
3008  }
3009 
3010  /* Report progress of XLOG streaming in PS display */
3012  {
3013  char activitymsg[50];
3014 
3015  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3017  set_ps_display(activitymsg);
3018  }
3019 }
bool update_process_title
Definition: ps_status.c:35
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:107

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog(), enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 119 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 220 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

Definition at line 126 of file walsender.c.

Referenced by exec_replication_command().

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 198 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 158 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 181 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 131 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader