PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 208 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 107 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 226 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1041 of file walsender.c.

1042 {
1043  const char *snapshot_name = NULL;
1044  char xloc[MAXFNAMELEN];
1045  char *slot_name;
1046  bool reserve_wal = false;
1047  bool two_phase = false;
1048  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1049  DestReceiver *dest;
1050  TupOutputState *tstate;
1051  TupleDesc tupdesc;
1052  Datum values[4];
1053  bool nulls[4] = {0};
1054 
1056 
1057  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1058 
1059  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1060  {
1061  ReplicationSlotCreate(cmd->slotname, false,
1063  false);
1064  }
1065  else
1066  {
1068 
1069  /*
1070  * Initially create persistent slot as ephemeral - that allows us to
1071  * nicely handle errors during initialization because it'll get
1072  * dropped if this transaction fails. We'll make it persistent at the
1073  * end. Temporary slots can be created as temporary from beginning as
1074  * they get dropped on error as well.
1075  */
1076  ReplicationSlotCreate(cmd->slotname, true,
1078  two_phase);
1079  }
1080 
1081  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1082  {
1084  bool need_full_snapshot = false;
1085 
1086  /*
1087  * Do options check early so that we can bail before calling the
1088  * DecodingContextFindStartpoint which can take long time.
1089  */
1090  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1091  {
1092  if (IsTransactionBlock())
1093  ereport(ERROR,
1094  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1095  (errmsg("%s must not be called inside a transaction",
1096  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1097 
1098  need_full_snapshot = true;
1099  }
1100  else if (snapshot_action == CRS_USE_SNAPSHOT)
1101  {
1102  if (!IsTransactionBlock())
1103  ereport(ERROR,
1104  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105  (errmsg("%s must be called inside a transaction",
1106  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113  if (!XactReadOnly)
1114  ereport(ERROR,
1115  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1116  (errmsg("%s must be called in a read-only transaction",
1117  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1118 
1119  if (FirstSnapshotSet)
1120  ereport(ERROR,
1121  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1122  (errmsg("%s must be called before any query",
1123  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1124 
1125  if (IsSubTransaction())
1126  ereport(ERROR,
1127  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1128  (errmsg("%s must not be called in a subtransaction",
1129  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1130 
1131  need_full_snapshot = true;
1132  }
1133 
1134  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1136  XL_ROUTINE(.page_read = logical_read_xlog_page,
1137  .segment_open = WalSndSegmentOpen,
1138  .segment_close = wal_segment_close),
1141 
1142  /*
1143  * Signal that we don't need the timeout mechanism. We're just
1144  * creating the replication slot and don't yet accept feedback
1145  * messages or send keepalives. As we possibly need to wait for
1146  * further WAL the walsender would otherwise possibly be killed too
1147  * soon.
1148  */
1150 
1151  /* build initial snapshot, might take a while */
1153 
1154  /*
1155  * Export or use the snapshot if we've been asked to do so.
1156  *
1157  * NB. We will convert the snapbuild.c kind of snapshot to normal
1158  * snapshot when doing this.
1159  */
1160  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1161  {
1162  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1163  }
1164  else if (snapshot_action == CRS_USE_SNAPSHOT)
1165  {
1166  Snapshot snap;
1167 
1170  }
1171 
1172  /* don't need the decoding context anymore */
1173  FreeDecodingContext(ctx);
1174 
1175  if (!cmd->temporary)
1177  }
1178  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1179  {
1181 
1183 
1184  /* Write this slot to disk if it's a permanent one. */
1185  if (!cmd->temporary)
1187  }
1188 
1189  snprintf(xloc, sizeof(xloc), "%X/%X",
1191 
1193 
1194  /*----------
1195  * Need a tuple descriptor representing four columns:
1196  * - first field: the slot name
1197  * - second field: LSN at which we became consistent
1198  * - third field: exported snapshot's name
1199  * - fourth field: output plugin
1200  */
1201  tupdesc = CreateTemplateTupleDesc(4);
1202  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1203  TEXTOID, -1, 0);
1204  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1205  TEXTOID, -1, 0);
1206  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1207  TEXTOID, -1, 0);
1208  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1209  TEXTOID, -1, 0);
1210 
1211  /* prepare for projection of tuples */
1212  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1213 
1214  /* slot_name */
1215  slot_name = NameStr(MyReplicationSlot->data.name);
1216  values[0] = CStringGetTextDatum(slot_name);
1217 
1218  /* consistent wal location */
1219  values[1] = CStringGetTextDatum(xloc);
1220 
1221  /* snapshot name, or NULL if none */
1222  if (snapshot_name != NULL)
1223  values[2] = CStringGetTextDatum(snapshot_name);
1224  else
1225  nulls[2] = true;
1226 
1227  /* plugin, or NULL if none */
1228  if (cmd->plugin != NULL)
1229  values[3] = CStringGetTextDatum(cmd->plugin);
1230  else
1231  nulls[3] = true;
1232 
1233  /* send it to dest */
1234  do_tup_output(tstate, values, nulls);
1235  end_tup_output(tstate);
1236 
1238 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define NameStr(name)
Definition: c.h:735
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2334
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2276
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2256
Assert(fmt[strlen(fmt) - 1] !='\n')
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:674
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:630
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:108
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:328
#define NIL
Definition: pg_list.h:68
static bool two_phase
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1175
void ReplicationSlotPersist(void)
Definition: slot.c:815
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
void ReplicationSlotSave(void)
Definition: slot.c:780
void ReplicationSlotRelease(void)
Definition: slot.c:549
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:253
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:669
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:570
bool FirstSnapshotSet
Definition: snapmgr.c:141
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1815
PGPROC * MyProc
Definition: proc.c:66
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:99
ReplicationSlotPersistentData data
Definition: slot.h:162
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:659
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2667
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:974
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1374
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1470
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:902
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1347
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:82
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:4889
bool IsTransactionBlock(void)
Definition: xact.c:4816
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:844

References Assert(), begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1244 of file walsender.c.

1245 {
1246  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1247 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:643

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1672 of file walsender.c.

1673 {
1674  int parse_rc;
1675  Node *cmd_node;
1676  const char *cmdtag;
1677  MemoryContext cmd_context;
1678  MemoryContext old_context;
1679 
1680  /*
1681  * If WAL sender has been told that shutdown is getting close, switch its
1682  * status accordingly to handle the next replication commands correctly.
1683  */
1684  if (got_STOPPING)
1686 
1687  /*
1688  * Throw error if in stopping mode. We need prevent commands that could
1689  * generate WAL while the shutdown checkpoint is being written. To be
1690  * safe, we just prohibit all new commands.
1691  */
1693  ereport(ERROR,
1694  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1695  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1696 
1697  /*
1698  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1699  * command arrives. Clean up the old stuff if there's anything.
1700  */
1702 
1704 
1705  /*
1706  * Prepare to parse and execute the command.
1707  */
1709  "Replication command context",
1711  old_context = MemoryContextSwitchTo(cmd_context);
1712 
1713  replication_scanner_init(cmd_string);
1714 
1715  /*
1716  * Is it a WalSender command?
1717  */
1719  {
1720  /* Nope; clean up and get out. */
1722 
1723  MemoryContextSwitchTo(old_context);
1724  MemoryContextDelete(cmd_context);
1725 
1726  /* XXX this is a pretty random place to make this check */
1727  if (MyDatabaseId == InvalidOid)
1728  ereport(ERROR,
1729  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1730  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1731 
1732  /* Tell the caller that this wasn't a WalSender command. */
1733  return false;
1734  }
1735 
1736  /*
1737  * Looks like a WalSender command, so parse it.
1738  */
1739  parse_rc = replication_yyparse();
1740  if (parse_rc != 0)
1741  ereport(ERROR,
1742  (errcode(ERRCODE_SYNTAX_ERROR),
1743  errmsg_internal("replication command parser returned %d",
1744  parse_rc)));
1746 
1747  cmd_node = replication_parse_result;
1748 
1749  /*
1750  * Report query to various monitoring facilities. For this purpose, we
1751  * report replication commands just like SQL commands.
1752  */
1753  debug_query_string = cmd_string;
1754 
1756 
1757  /*
1758  * Log replication command if log_replication_commands is enabled. Even
1759  * when it's disabled, log the command with DEBUG1 level for backward
1760  * compatibility.
1761  */
1763  (errmsg("received replication command: %s", cmd_string)));
1764 
1765  /*
1766  * Disallow replication commands in aborted transaction blocks.
1767  */
1769  ereport(ERROR,
1770  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1771  errmsg("current transaction is aborted, "
1772  "commands ignored until end of transaction block")));
1773 
1775 
1776  /*
1777  * Allocate buffers that will be used for each outgoing and incoming
1778  * message. We do this just once per command to reduce palloc overhead.
1779  */
1783 
1784  switch (cmd_node->type)
1785  {
1786  case T_IdentifySystemCmd:
1787  cmdtag = "IDENTIFY_SYSTEM";
1788  set_ps_display(cmdtag);
1789  IdentifySystem();
1790  EndReplicationCommand(cmdtag);
1791  break;
1792 
1793  case T_ReadReplicationSlotCmd:
1794  cmdtag = "READ_REPLICATION_SLOT";
1795  set_ps_display(cmdtag);
1797  EndReplicationCommand(cmdtag);
1798  break;
1799 
1800  case T_BaseBackupCmd:
1801  cmdtag = "BASE_BACKUP";
1802  set_ps_display(cmdtag);
1803  PreventInTransactionBlock(true, cmdtag);
1804  SendBaseBackup((BaseBackupCmd *) cmd_node);
1805  EndReplicationCommand(cmdtag);
1806  break;
1807 
1808  case T_CreateReplicationSlotCmd:
1809  cmdtag = "CREATE_REPLICATION_SLOT";
1810  set_ps_display(cmdtag);
1812  EndReplicationCommand(cmdtag);
1813  break;
1814 
1815  case T_DropReplicationSlotCmd:
1816  cmdtag = "DROP_REPLICATION_SLOT";
1817  set_ps_display(cmdtag);
1819  EndReplicationCommand(cmdtag);
1820  break;
1821 
1822  case T_StartReplicationCmd:
1823  {
1824  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1825 
1826  cmdtag = "START_REPLICATION";
1827  set_ps_display(cmdtag);
1828  PreventInTransactionBlock(true, cmdtag);
1829 
1830  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1831  StartReplication(cmd);
1832  else
1834 
1835  /* dupe, but necessary per libpqrcv_endstreaming */
1836  EndReplicationCommand(cmdtag);
1837 
1838  Assert(xlogreader != NULL);
1839  break;
1840  }
1841 
1842  case T_TimeLineHistoryCmd:
1843  cmdtag = "TIMELINE_HISTORY";
1844  set_ps_display(cmdtag);
1845  PreventInTransactionBlock(true, cmdtag);
1847  EndReplicationCommand(cmdtag);
1848  break;
1849 
1850  case T_VariableShowStmt:
1851  {
1853  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1854 
1855  cmdtag = "SHOW";
1856  set_ps_display(cmdtag);
1857 
1858  /* syscache access needs a transaction environment */
1860  GetPGVariable(n->name, dest);
1862  EndReplicationCommand(cmdtag);
1863  }
1864  break;
1865 
1866  default:
1867  elog(ERROR, "unrecognized replication command node tag: %u",
1868  cmd_node->type);
1869  }
1870 
1871  /* done */
1872  MemoryContextSwitchTo(old_context);
1873  MemoryContextDelete(cmd_context);
1874 
1875  /*
1876  * We need not update ps display or pg_stat_activity, because PostgresMain
1877  * will reset those to "idle". But we must reset debug_query_string to
1878  * ensure it doesn't become a dangling pointer.
1879  */
1880  debug_query_string = NULL;
1881 
1882  return true;
1883 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:964
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:201
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errcode(int sqlerrcode)
Definition: elog.c:858
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
const char * debug_query_string
Definition: postgres.c:85
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:730
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:82
WalSndState state
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:577
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:478
static StringInfoData tmpbuf
Definition: walsender.c:160
static void IdentifySystem(void)
Definition: walsender.c:395
static StringInfoData reply_message
Definition: walsender.c:159
void WalSndSetState(WalSndState state)
Definition: walsender.c:3470
static StringInfoData output_message
Definition: walsender.c:158
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
bool log_replication_commands
Definition: walsender.c:126
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1041
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1254
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1244
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
static XLogReaderState * xlogreader
Definition: walsender.c:138
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3481
void StartTransactionCommand(void)
Definition: xact.c:2937
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:398
void CommitTransactionCommand(void)
Definition: xact.c:3034

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog(), EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)
static

Definition at line 3170 of file walsender.c.

3171 {
3172  XLogRecPtr replayPtr;
3173  TimeLineID replayTLI;
3174  XLogRecPtr receivePtr;
3176  XLogRecPtr result;
3177 
3178  /*
3179  * We can safely send what's already been replayed. Also, if walreceiver
3180  * is streaming WAL from the same timeline, we can send anything that it
3181  * has streamed, but hasn't been replayed yet.
3182  */
3183 
3184  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3185  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3186 
3187  if (tli)
3188  *tli = replayTLI;
3189 
3190  result = replayPtr;
3191  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3192  result = receivePtr;
3193 
3194  return result;
3195 }
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3224 of file walsender.c.

3225 {
3227 
3228  /*
3229  * If replication has not yet started, die like with SIGTERM. If
3230  * replication is active, only set a flag and wake up the main loop. It
3231  * will send any outstanding WAL, wait for it to be replicated to the
3232  * standby, and then exit gracefully.
3233  */
3234  if (!replication_active)
3235  kill(MyProcPid, SIGTERM);
3236  else
3237  got_STOPPING = true;
3238 }
int MyProcPid
Definition: globals.c:44
bool am_walsender
Definition: walsender.c:116
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define kill(pid, sig)
Definition: win32_port.h:485

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 395 of file walsender.c.

396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4] = {0};
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440 
441  /* need a tuple descriptor representing four columns */
442  tupdesc = CreateTemplateTupleDesc(4);
443  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
444  TEXTOID, -1, 0);
445  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446  INT8OID, -1, 0);
447  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
448  TEXTOID, -1, 0);
449  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450  TEXTOID, -1, 0);
451 
452  /* prepare for projection of tuples */
453  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 
455  /* column 1: system identifier */
456  values[0] = CStringGetTextDatum(sysid);
457 
458  /* column 2: timeline */
459  values[1] = Int64GetDatum(currTLI);
460 
461  /* column 3: wal location */
462  values[2] = CStringGetTextDatum(xloc);
463 
464  /* column 4: database name, or NULL if none */
465  if (dbname)
467  else
468  nulls[3] = true;
469 
470  /* send it to dest */
471  do_tup_output(tstate, values, nulls);
472 
473  end_tup_output(tstate);
474 }
#define UINT64_FORMAT
Definition: c.h:538
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3084
struct cursor * cur
Definition: ecpg.c:28
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1790
char * dbname
Definition: streamutil.c:51
bool am_cascading_walsender
Definition: walsender.c:117
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3170
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4203
bool RecoveryInProgress(void)
Definition: xlog.c:5948
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6113

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2572 of file walsender.c.

2573 {
2574  int i;
2575 
2576  /*
2577  * WalSndCtl should be set up already (we inherit this by fork() or
2578  * EXEC_BACKEND mechanism from the postmaster).
2579  */
2580  Assert(WalSndCtl != NULL);
2581  Assert(MyWalSnd == NULL);
2582 
2583  /*
2584  * Find a free walsender slot and reserve it. This must not fail due to
2585  * the prior check for free WAL senders in InitProcess().
2586  */
2587  for (i = 0; i < max_wal_senders; i++)
2588  {
2589  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2590 
2591  SpinLockAcquire(&walsnd->mutex);
2592 
2593  if (walsnd->pid != 0)
2594  {
2595  SpinLockRelease(&walsnd->mutex);
2596  continue;
2597  }
2598  else
2599  {
2600  /*
2601  * Found a free slot. Reserve it for us.
2602  */
2603  walsnd->pid = MyProcPid;
2604  walsnd->state = WALSNDSTATE_STARTUP;
2605  walsnd->sentPtr = InvalidXLogRecPtr;
2606  walsnd->needreload = false;
2607  walsnd->write = InvalidXLogRecPtr;
2608  walsnd->flush = InvalidXLogRecPtr;
2609  walsnd->apply = InvalidXLogRecPtr;
2610  walsnd->writeLag = -1;
2611  walsnd->flushLag = -1;
2612  walsnd->applyLag = -1;
2613  walsnd->sync_standby_priority = 0;
2614  walsnd->latch = &MyProc->procLatch;
2615  walsnd->replyTime = 0;
2616 
2617  /*
2618  * The kind assignment is done here and not in StartReplication()
2619  * and StartLogicalReplication(). Indeed, the logical walsender
2620  * needs to read WAL records (like snapshot of running
2621  * transactions) during the slot creation. So it needs to be woken
2622  * up based on its kind.
2623  *
2624  * The kind assignment could also be done in StartReplication(),
2625  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2626  * seems better to set it on one place.
2627  */
2628  if (MyDatabaseId == InvalidOid)
2629  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2630  else
2631  walsnd->kind = REPLICATION_KIND_LOGICAL;
2632 
2633  SpinLockRelease(&walsnd->mutex);
2634  /* don't need the lock anymore */
2635  MyWalSnd = (WalSnd *) walsnd;
2636 
2637  break;
2638  }
2639  }
2640 
2641  Assert(MyWalSnd != NULL);
2642 
2643  /* Arrange to clean up at walsender exit */
2645 }
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch procLatch
Definition: proc.h:170
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:122
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2649
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert(), WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProc, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3821 of file walsender.c.

3822 {
3823  TimestampTz time = 0;
3824 
3825  /* Read all unread samples up to this LSN or end of buffer. */
3826  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3827  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3828  {
3829  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3830  lag_tracker->last_read[head] =
3832  lag_tracker->read_heads[head] =
3834  }
3835 
3836  /*
3837  * If the lag tracker is empty, that means the standby has processed
3838  * everything we've ever sent so we should now clear 'last_read'. If we
3839  * didn't do that, we'd risk using a stale and irrelevant sample for
3840  * interpolation at the beginning of the next burst of WAL after a period
3841  * of idleness.
3842  */
3844  lag_tracker->last_read[head].time = 0;
3845 
3846  if (time > now)
3847  {
3848  /* If the clock somehow went backwards, treat as not found. */
3849  return -1;
3850  }
3851  else if (time == 0)
3852  {
3853  /*
3854  * We didn't cross a time. If there is a future sample that we
3855  * haven't reached yet, and we've already reached at least one sample,
3856  * let's interpolate the local flushed time. This is mainly useful
3857  * for reporting a completely stuck apply position as having
3858  * increasing lag, since otherwise we'd have to wait for it to
3859  * eventually start moving again and cross one of our samples before
3860  * we can show the lag increasing.
3861  */
3863  {
3864  /* There are no future samples, so we can't interpolate. */
3865  return -1;
3866  }
3867  else if (lag_tracker->last_read[head].time != 0)
3868  {
3869  /* We can interpolate between last_read and the next sample. */
3870  double fraction;
3871  WalTimeSample prev = lag_tracker->last_read[head];
3873 
3874  if (lsn < prev.lsn)
3875  {
3876  /*
3877  * Reported LSNs shouldn't normally go backwards, but it's
3878  * possible when there is a timeline change. Treat as not
3879  * found.
3880  */
3881  return -1;
3882  }
3883 
3884  Assert(prev.lsn < next.lsn);
3885 
3886  if (prev.time > next.time)
3887  {
3888  /* If the clock somehow went backwards, treat as not found. */
3889  return -1;
3890  }
3891 
3892  /* See how far we are between the previous and next samples. */
3893  fraction =
3894  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3895 
3896  /* Scale the local flush time proportionally. */
3897  time = (TimestampTz)
3898  ((double) prev.time + (next.time - prev.time) * fraction);
3899  }
3900  else
3901  {
3902  /*
3903  * We have only a future sample, implying that we were entirely
3904  * caught up but and now there is a new burst of WAL and the
3905  * standby hasn't processed the first sample yet. Until the
3906  * standby reaches the future sample the best we can do is report
3907  * the hypothetical lag if that sample were to be replayed now.
3908  */
3909  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3910  }
3911  }
3912 
3913  /* Return the elapsed time since local flush time in microseconds. */
3914  Assert(time != 0);
3915  return now - time;
3916 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
static int32 next
Definition: blutils.c:219
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:220
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208

References Assert(), LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3756 of file walsender.c.

3757 {
3758  bool buffer_full;
3759  int new_write_head;
3760  int i;
3761 
3762  if (!am_walsender)
3763  return;
3764 
3765  /*
3766  * If the lsn hasn't advanced since last time, then do nothing. This way
3767  * we only record a new sample when new WAL has been written.
3768  */
3769  if (lag_tracker->last_lsn == lsn)
3770  return;
3771  lag_tracker->last_lsn = lsn;
3772 
3773  /*
3774  * If advancing the write head of the circular buffer would crash into any
3775  * of the read heads, then the buffer is full. In other words, the
3776  * slowest reader (presumably apply) is the one that controls the release
3777  * of space.
3778  */
3779  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3780  buffer_full = false;
3781  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3782  {
3783  if (new_write_head == lag_tracker->read_heads[i])
3784  buffer_full = true;
3785  }
3786 
3787  /*
3788  * If the buffer is full, for now we just rewind by one slot and overwrite
3789  * the last sample, as a simple (if somewhat uneven) way to lower the
3790  * sampling rate. There may be better adaptive compaction algorithms.
3791  */
3792  if (buffer_full)
3793  {
3794  new_write_head = lag_tracker->write_head;
3795  if (lag_tracker->write_head > 0)
3797  else
3799  }
3800 
3801  /* Store a sample at the current write head position. */
3803  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3804  lag_tracker->write_head = new_write_head;
3805 }
XLogRecPtr last_lsn
Definition: walsender.c:213
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 902 of file walsender.c.

904 {
905  XLogRecPtr flushptr;
906  int count;
907  WALReadError errinfo;
908  XLogSegNo segno;
909  TimeLineID currTLI;
910 
911  /*
912  * Make sure we have enough WAL available before retrieving the current
913  * timeline. This is needed to determine am_cascading_walsender accurately
914  * which is needed to determine the current timeline.
915  */
916  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
917 
918  /*
919  * Since logical decoding is also permitted on a standby server, we need
920  * to check if the server is in recovery to decide how to get the current
921  * timeline ID (so that it also cover the promotion or timeline change
922  * cases).
923  */
925 
927  GetXLogReplayRecPtr(&currTLI);
928  else
929  currTLI = GetWALInsertionTimeLine();
930 
931  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
932  sendTimeLineIsHistoric = (state->currTLI != currTLI);
933  sendTimeLine = state->currTLI;
934  sendTimeLineValidUpto = state->currTLIValidUntil;
935  sendTimeLineNextTLI = state->nextTLI;
936 
937  /* fail if not (implies we are going to shut down) */
938  if (flushptr < targetPagePtr + reqLen)
939  return -1;
940 
941  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
942  count = XLOG_BLCKSZ; /* more than one block available */
943  else
944  count = flushptr - targetPagePtr; /* part of the page available */
945 
946  /* now actually read the data, we know it's there */
947  if (!WALRead(state,
948  cur_page,
949  targetPagePtr,
950  XLOG_BLCKSZ,
951  currTLI, /* Pass the current TLI because only
952  * WalSndSegmentOpen controls whether new TLI
953  * is needed. */
954  &errinfo))
955  WALReadRaiseError(&errinfo);
956 
957  /*
958  * After reading into the buffer, check that what we read was valid. We do
959  * this after reading, because even though the segment was present when we
960  * opened it, it might get recycled or removed while we read it. The
961  * read() succeeds in that case, but the data we tried to read might
962  * already have been overwritten with new WAL records.
963  */
964  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
965  CheckXLogRemoved(segno, state->seg.ws_tli);
966 
967  return count;
968 }
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:146
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1538
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6136
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3460
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1511
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:720
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1027

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3508 of file walsender.c.

3509 {
3510  Interval *result = palloc(sizeof(Interval));
3511 
3512  result->month = 0;
3513  result->day = 0;
3514  result->time = offset;
3515 
3516  return result;
3517 }
void * palloc(Size size)
Definition: mcxt.c:1226
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase 
)
static

Definition at line 974 of file walsender.c.

978 {
979  ListCell *lc;
980  bool snapshot_action_given = false;
981  bool reserve_wal_given = false;
982  bool two_phase_given = false;
983 
984  /* Parse options */
985  foreach(lc, cmd->options)
986  {
987  DefElem *defel = (DefElem *) lfirst(lc);
988 
989  if (strcmp(defel->defname, "snapshot") == 0)
990  {
991  char *action;
992 
993  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
994  ereport(ERROR,
995  (errcode(ERRCODE_SYNTAX_ERROR),
996  errmsg("conflicting or redundant options")));
997 
998  action = defGetString(defel);
999  snapshot_action_given = true;
1000 
1001  if (strcmp(action, "export") == 0)
1002  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1003  else if (strcmp(action, "nothing") == 0)
1004  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1005  else if (strcmp(action, "use") == 0)
1006  *snapshot_action = CRS_USE_SNAPSHOT;
1007  else
1008  ereport(ERROR,
1009  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1010  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1011  defel->defname, action)));
1012  }
1013  else if (strcmp(defel->defname, "reserve_wal") == 0)
1014  {
1015  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1016  ereport(ERROR,
1017  (errcode(ERRCODE_SYNTAX_ERROR),
1018  errmsg("conflicting or redundant options")));
1019 
1020  reserve_wal_given = true;
1021  *reserve_wal = defGetBoolean(defel);
1022  }
1023  else if (strcmp(defel->defname, "two_phase") == 0)
1024  {
1025  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1026  ereport(ERROR,
1027  (errcode(ERRCODE_SYNTAX_ERROR),
1028  errmsg("conflicting or redundant options")));
1029  two_phase_given = true;
1030  *two_phase = defGetBoolean(defel);
1031  }
1032  else
1033  elog(ERROR, "unrecognized option: %s", defel->defname);
1034  }
1035 }
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:809
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog(), ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3524 of file walsender.c.

3525 {
3526 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3527  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3528  SyncRepStandbyData *sync_standbys;
3529  int num_standbys;
3530  int i;
3531 
3532  InitMaterializedSRF(fcinfo, 0);
3533 
3534  /*
3535  * Get the currently active synchronous standbys. This could be out of
3536  * date before we're done, but we'll use the data anyway.
3537  */
3538  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3539 
3540  for (i = 0; i < max_wal_senders; i++)
3541  {
3542  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3543  XLogRecPtr sent_ptr;
3544  XLogRecPtr write;
3545  XLogRecPtr flush;
3546  XLogRecPtr apply;
3547  TimeOffset writeLag;
3548  TimeOffset flushLag;
3549  TimeOffset applyLag;
3550  int priority;
3551  int pid;
3553  TimestampTz replyTime;
3554  bool is_sync_standby;
3556  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3557  int j;
3558 
3559  /* Collect data from shared memory */
3560  SpinLockAcquire(&walsnd->mutex);
3561  if (walsnd->pid == 0)
3562  {
3563  SpinLockRelease(&walsnd->mutex);
3564  continue;
3565  }
3566  pid = walsnd->pid;
3567  sent_ptr = walsnd->sentPtr;
3568  state = walsnd->state;
3569  write = walsnd->write;
3570  flush = walsnd->flush;
3571  apply = walsnd->apply;
3572  writeLag = walsnd->writeLag;
3573  flushLag = walsnd->flushLag;
3574  applyLag = walsnd->applyLag;
3575  priority = walsnd->sync_standby_priority;
3576  replyTime = walsnd->replyTime;
3577  SpinLockRelease(&walsnd->mutex);
3578 
3579  /*
3580  * Detect whether walsender is/was considered synchronous. We can
3581  * provide some protection against stale data by checking the PID
3582  * along with walsnd_index.
3583  */
3584  is_sync_standby = false;
3585  for (j = 0; j < num_standbys; j++)
3586  {
3587  if (sync_standbys[j].walsnd_index == i &&
3588  sync_standbys[j].pid == pid)
3589  {
3590  is_sync_standby = true;
3591  break;
3592  }
3593  }
3594 
3595  values[0] = Int32GetDatum(pid);
3596 
3597  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3598  {
3599  /*
3600  * Only superusers and roles with privileges of pg_read_all_stats
3601  * can see details. Other users only get the pid value to know
3602  * it's a walsender, but no details.
3603  */
3604  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3605  }
3606  else
3607  {
3609 
3610  if (XLogRecPtrIsInvalid(sent_ptr))
3611  nulls[2] = true;
3612  values[2] = LSNGetDatum(sent_ptr);
3613 
3615  nulls[3] = true;
3616  values[3] = LSNGetDatum(write);
3617 
3618  if (XLogRecPtrIsInvalid(flush))
3619  nulls[4] = true;
3620  values[4] = LSNGetDatum(flush);
3621 
3622  if (XLogRecPtrIsInvalid(apply))
3623  nulls[5] = true;
3624  values[5] = LSNGetDatum(apply);
3625 
3626  /*
3627  * Treat a standby such as a pg_basebackup background process
3628  * which always returns an invalid flush location, as an
3629  * asynchronous standby.
3630  */
3631  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3632 
3633  if (writeLag < 0)
3634  nulls[6] = true;
3635  else
3637 
3638  if (flushLag < 0)
3639  nulls[7] = true;
3640  else
3642 
3643  if (applyLag < 0)
3644  nulls[8] = true;
3645  else
3647 
3648  values[9] = Int32GetDatum(priority);
3649 
3650  /*
3651  * More easily understood version of standby state. This is purely
3652  * informational.
3653  *
3654  * In quorum-based sync replication, the role of each standby
3655  * listed in synchronous_standby_names can be changing very
3656  * frequently. Any standbys considered as "sync" at one moment can
3657  * be switched to "potential" ones at the next moment. So, it's
3658  * basically useless to report "sync" or "potential" as their sync
3659  * states. We report just "quorum" for them.
3660  */
3661  if (priority == 0)
3662  values[10] = CStringGetTextDatum("async");
3663  else if (is_sync_standby)
3665  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3666  else
3667  values[10] = CStringGetTextDatum("potential");
3668 
3669  if (replyTime == 0)
3670  nulls[11] = true;
3671  else
3672  values[11] = TimestampTzGetDatum(replyTime);
3673  }
3674 
3675  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3676  values, nulls);
3677  }
3678 
3679  return (Datum) 0;
3680 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4961
#define MemSet(start, val, len)
Definition: c.h:1009
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:74
Oid GetUserId(void)
Definition: miscinit.c:509
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:717
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3508
#define PG_STAT_GET_WAL_SENDERS_COLS
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3489
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2034 of file walsender.c.

2035 {
2036  bool changed = false;
2038 
2039  Assert(lsn != InvalidXLogRecPtr);
2040  SpinLockAcquire(&slot->mutex);
2041  if (slot->data.restart_lsn != lsn)
2042  {
2043  changed = true;
2044  slot->data.restart_lsn = lsn;
2045  }
2046  SpinLockRelease(&slot->mutex);
2047 
2048  if (changed)
2049  {
2052  }
2053 
2054  /*
2055  * One could argue that the slot should be saved to disk now, but that'd
2056  * be energy wasted - the worst thing lost information could cause here is
2057  * to give wrong information in a statistics view - we'll just potentially
2058  * be more conservative in removing files.
2059  */
2060 }
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:893
XLogRecPtr restart_lsn
Definition: slot.h:88
slock_t mutex
Definition: slot.h:135

References Assert(), ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2171 of file walsender.c.

2172 {
2173  bool changed = false;
2175 
2176  SpinLockAcquire(&slot->mutex);
2178 
2179  /*
2180  * For physical replication we don't need the interlock provided by xmin
2181  * and effective_xmin since the consequences of a missed increase are
2182  * limited to query cancellations, so set both at once.
2183  */
2184  if (!TransactionIdIsNormal(slot->data.xmin) ||
2185  !TransactionIdIsNormal(feedbackXmin) ||
2186  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2187  {
2188  changed = true;
2189  slot->data.xmin = feedbackXmin;
2190  slot->effective_xmin = feedbackXmin;
2191  }
2192  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2193  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2194  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2195  {
2196  changed = true;
2197  slot->data.catalog_xmin = feedbackCatalogXmin;
2198  slot->effective_catalog_xmin = feedbackCatalogXmin;
2199  }
2200  SpinLockRelease(&slot->mutex);
2201 
2202  if (changed)
2203  {
2206  }
2207 }
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
TransactionId xmin
Definition: proc.h:178
TransactionId xmin
Definition: slot.h:77
TransactionId catalog_xmin
Definition: slot.h:85
TransactionId effective_catalog_xmin
Definition: slot.h:159
TransactionId effective_xmin
Definition: slot.h:158
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1416 of file walsender.c.

1417 {
1418  for (;;)
1419  {
1420  long sleeptime;
1421 
1422  /* Check for input from the client */
1424 
1425  /* die if timeout was reached */
1427 
1428  /* Send keepalive if the time has come */
1430 
1431  if (!pq_is_send_pending())
1432  break;
1433 
1435 
1436  /* Sleep until something happens or we time out */
1438  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1439 
1440  /* Clear any already-pending wakeups */
1442 
1444 
1445  /* Process any requests or signals received recently */
1446  if (ConfigReloadPending)
1447  {
1448  ConfigReloadPending = false;
1451  }
1452 
1453  /* Try to flush pending output to the client */
1454  if (pq_flush_if_writable() != 0)
1455  WalSndShutdown();
1456  }
1457 
1458  /* reactivate latch so WalSndLoop knows to continue */
1459  SetLatch(MyLatch);
1460 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:605
void ResetLatch(Latch *latch)
Definition: latch.c:697
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:403
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3354
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2419
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1890
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3718
static void WalSndShutdown(void)
Definition: walsender.c:230
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2375

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1890 of file walsender.c.

1891 {
1892  unsigned char firstchar;
1893  int maxmsglen;
1894  int r;
1895  bool received = false;
1896 
1898 
1899  /*
1900  * If we already received a CopyDone from the frontend, any subsequent
1901  * message is the beginning of a new command, and should be processed in
1902  * the main processing loop.
1903  */
1904  while (!streamingDoneReceiving)
1905  {
1906  pq_startmsgread();
1907  r = pq_getbyte_if_available(&firstchar);
1908  if (r < 0)
1909  {
1910  /* unexpected error or EOF */
1912  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1913  errmsg("unexpected EOF on standby connection")));
1914  proc_exit(0);
1915  }
1916  if (r == 0)
1917  {
1918  /* no data available without blocking */
1919  pq_endmsgread();
1920  break;
1921  }
1922 
1923  /* Validate message type and set packet size limit */
1924  switch (firstchar)
1925  {
1926  case PqMsg_CopyData:
1927  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1928  break;
1929  case PqMsg_CopyDone:
1930  case PqMsg_Terminate:
1931  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1932  break;
1933  default:
1934  ereport(FATAL,
1935  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1936  errmsg("invalid standby message type \"%c\"",
1937  firstchar)));
1938  maxmsglen = 0; /* keep compiler quiet */
1939  break;
1940  }
1941 
1942  /* Read the message contents */
1944  if (pq_getmessage(&reply_message, maxmsglen))
1945  {
1947  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1948  errmsg("unexpected EOF on standby connection")));
1949  proc_exit(0);
1950  }
1951 
1952  /* ... and process it */
1953  switch (firstchar)
1954  {
1955  /*
1956  * 'd' means a standby reply wrapped in a CopyData packet.
1957  */
1958  case PqMsg_CopyData:
1960  received = true;
1961  break;
1962 
1963  /*
1964  * CopyDone means the standby requested to finish streaming.
1965  * Reply with CopyDone, if we had not sent that already.
1966  */
1967  case PqMsg_CopyDone:
1968  if (!streamingDoneSending)
1969  {
1970  pq_putmessage_noblock('c', NULL, 0);
1971  streamingDoneSending = true;
1972  }
1973 
1974  streamingDoneReceiving = true;
1975  received = true;
1976  break;
1977 
1978  /*
1979  * 'X' means that the standby is closing down the socket.
1980  */
1981  case PqMsg_Terminate:
1982  proc_exit(0);
1983 
1984  default:
1985  Assert(false); /* NOT REACHED */
1986  }
1987  }
1988 
1989  /*
1990  * Save the last reply timestamp if we've received at least one reply.
1991  */
1992  if (received)
1993  {
1995  waiting_for_ping_response = false;
1996  }
1997 }
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1020
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1212
void pq_endmsgread(void)
Definition: pqcomm.c:1174
void pq_startmsgread(void)
Definition: pqcomm.c:1150
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static bool waiting_for_ping_response
Definition: walsender.c:172
static TimestampTz last_processing
Definition: walsender.c:163
static bool streamingDoneSending
Definition: walsender.c:180
static void ProcessStandbyMessage(void)
Definition: walsender.c:2003
static bool streamingDoneReceiving
Definition: walsender.c:181

References Assert(), COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2251 of file walsender.c.

2252 {
2253  TransactionId feedbackXmin;
2254  uint32 feedbackEpoch;
2255  TransactionId feedbackCatalogXmin;
2256  uint32 feedbackCatalogEpoch;
2257  TimestampTz replyTime;
2258 
2259  /*
2260  * Decipher the reply message. The caller already consumed the msgtype
2261  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2262  * of this message.
2263  */
2264  replyTime = pq_getmsgint64(&reply_message);
2265  feedbackXmin = pq_getmsgint(&reply_message, 4);
2266  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2267  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2268  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2269 
2271  {
2272  char *replyTimeStr;
2273 
2274  /* Copy because timestamptz_to_str returns a static buffer */
2275  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2276 
2277  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2278  feedbackXmin,
2279  feedbackEpoch,
2280  feedbackCatalogXmin,
2281  feedbackCatalogEpoch,
2282  replyTimeStr);
2283 
2284  pfree(replyTimeStr);
2285  }
2286 
2287  /*
2288  * Update shared state for this WalSender process based on reply data from
2289  * standby.
2290  */
2291  {
2292  WalSnd *walsnd = MyWalSnd;
2293 
2294  SpinLockAcquire(&walsnd->mutex);
2295  walsnd->replyTime = replyTime;
2296  SpinLockRelease(&walsnd->mutex);
2297  }
2298 
2299  /*
2300  * Unset WalSender's xmins if the feedback message values are invalid.
2301  * This happens when the downstream turned hot_standby_feedback off.
2302  */
2303  if (!TransactionIdIsNormal(feedbackXmin)
2304  && !TransactionIdIsNormal(feedbackCatalogXmin))
2305  {
2307  if (MyReplicationSlot != NULL)
2308  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2309  return;
2310  }
2311 
2312  /*
2313  * Check that the provided xmin/epoch are sane, that is, not in the future
2314  * and not so far back as to be already wrapped around. Ignore if not.
2315  */
2316  if (TransactionIdIsNormal(feedbackXmin) &&
2317  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2318  return;
2319 
2320  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2321  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2322  return;
2323 
2324  /*
2325  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2326  * the xmin will be taken into account by GetSnapshotData() /
2327  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2328  * thereby prevent the generation of cleanup conflicts on the standby
2329  * server.
2330  *
2331  * There is a small window for a race condition here: although we just
2332  * checked that feedbackXmin precedes nextXid, the nextXid could have
2333  * gotten advanced between our fetching it and applying the xmin below,
2334  * perhaps far enough to make feedbackXmin wrap around. In that case the
2335  * xmin we set here would be "in the future" and have no effect. No point
2336  * in worrying about this since it's too late to save the desired data
2337  * anyway. Assuming that the standby sends us an increasing sequence of
2338  * xmins, this could only happen during the first reply cycle, else our
2339  * own xmin would prevent nextXid from advancing so far.
2340  *
2341  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2342  * is assumed atomic, and there's no real need to prevent concurrent
2343  * horizon determinations. (If we're moving our xmin forward, this is
2344  * obviously safe, and if we're moving it backwards, well, the data is at
2345  * risk already since a VACUUM could already have determined the horizon.)
2346  *
2347  * If we're using a replication slot we reserve the xmin via that,
2348  * otherwise via the walsender's PGPROC entry. We can only track the
2349  * catalog xmin separately when using a slot, so we store the least of the
2350  * two provided when not using a slot.
2351  *
2352  * XXX: It might make sense to generalize the ephemeral slot concept and
2353  * always use the slot mechanism to handle the feedback xmin.
2354  */
2355  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2356  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2357  else
2358  {
2359  if (TransactionIdIsNormal(feedbackCatalogXmin)
2360  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2361  MyProc->xmin = feedbackCatalogXmin;
2362  else
2363  MyProc->xmin = feedbackXmin;
2364  }
2365 }
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1782
unsigned int uint32
Definition: c.h:495
uint32 TransactionId
Definition: c.h:641
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2171
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2220

References DEBUG2, elog(), InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2003 of file walsender.c.

2004 {
2005  char msgtype;
2006 
2007  /*
2008  * Check message type from the first byte.
2009  */
2010  msgtype = pq_getmsgbyte(&reply_message);
2011 
2012  switch (msgtype)
2013  {
2014  case 'r':
2016  break;
2017 
2018  case 'h':
2020  break;
2021 
2022  default:
2024  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2025  errmsg("unexpected message type \"%c\"", msgtype)));
2026  proc_exit(0);
2027  }
2028 }
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2251
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2066

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2066 of file walsender.c.

2067 {
2068  XLogRecPtr writePtr,
2069  flushPtr,
2070  applyPtr;
2071  bool replyRequested;
2072  TimeOffset writeLag,
2073  flushLag,
2074  applyLag;
2075  bool clearLagTimes;
2076  TimestampTz now;
2077  TimestampTz replyTime;
2078 
2079  static bool fullyAppliedLastTime = false;
2080 
2081  /* the caller already consumed the msgtype byte */
2082  writePtr = pq_getmsgint64(&reply_message);
2083  flushPtr = pq_getmsgint64(&reply_message);
2084  applyPtr = pq_getmsgint64(&reply_message);
2085  replyTime = pq_getmsgint64(&reply_message);
2086  replyRequested = pq_getmsgbyte(&reply_message);
2087 
2089  {
2090  char *replyTimeStr;
2091 
2092  /* Copy because timestamptz_to_str returns a static buffer */
2093  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2094 
2095  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2096  LSN_FORMAT_ARGS(writePtr),
2097  LSN_FORMAT_ARGS(flushPtr),
2098  LSN_FORMAT_ARGS(applyPtr),
2099  replyRequested ? " (reply requested)" : "",
2100  replyTimeStr);
2101 
2102  pfree(replyTimeStr);
2103  }
2104 
2105  /* See if we can compute the round-trip lag for these positions. */
2107  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2108  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2109  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2110 
2111  /*
2112  * If the standby reports that it has fully replayed the WAL in two
2113  * consecutive reply messages, then the second such message must result
2114  * from wal_receiver_status_interval expiring on the standby. This is a
2115  * convenient time to forget the lag times measured when it last
2116  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2117  * until more WAL traffic arrives.
2118  */
2119  clearLagTimes = false;
2120  if (applyPtr == sentPtr)
2121  {
2122  if (fullyAppliedLastTime)
2123  clearLagTimes = true;
2124  fullyAppliedLastTime = true;
2125  }
2126  else
2127  fullyAppliedLastTime = false;
2128 
2129  /* Send a reply if the standby requested one. */
2130  if (replyRequested)
2132 
2133  /*
2134  * Update shared state for this WalSender process based on reply data from
2135  * standby.
2136  */
2137  {
2138  WalSnd *walsnd = MyWalSnd;
2139 
2140  SpinLockAcquire(&walsnd->mutex);
2141  walsnd->write = writePtr;
2142  walsnd->flush = flushPtr;
2143  walsnd->apply = applyPtr;
2144  if (writeLag != -1 || clearLagTimes)
2145  walsnd->writeLag = writeLag;
2146  if (flushLag != -1 || clearLagTimes)
2147  walsnd->flushLag = flushLag;
2148  if (applyLag != -1 || clearLagTimes)
2149  walsnd->applyLag = applyLag;
2150  walsnd->replyTime = replyTime;
2151  SpinLockRelease(&walsnd->mutex);
2152  }
2153 
2156 
2157  /*
2158  * Advance our local xmin horizon when the client confirmed a flush.
2159  */
2160  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2161  {
2164  else
2166  }
2167 }
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1815
#define SlotIsLogical(slot)
Definition: slot.h:191
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:432
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:155
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2034
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3695
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3821

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog(), WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 478 of file walsender.c.

479 {
480 #define READ_REPLICATION_SLOT_COLS 3
481  ReplicationSlot *slot;
483  TupOutputState *tstate;
484  TupleDesc tupdesc;
486  bool nulls[READ_REPLICATION_SLOT_COLS];
487 
489  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
490  TEXTOID, -1, 0);
491  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492  TEXTOID, -1, 0);
493  /* TimeLineID is unsigned, so int4 is not wide enough. */
494  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495  INT8OID, -1, 0);
496 
497  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
498 
499  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
500  slot = SearchNamedReplicationSlot(cmd->slotname, false);
501  if (slot == NULL || !slot->in_use)
502  {
503  LWLockRelease(ReplicationSlotControlLock);
504  }
505  else
506  {
507  ReplicationSlot slot_contents;
508  int i = 0;
509 
510  /* Copy slot contents while holding spinlock */
511  SpinLockAcquire(&slot->mutex);
512  slot_contents = *slot;
513  SpinLockRelease(&slot->mutex);
514  LWLockRelease(ReplicationSlotControlLock);
515 
516  if (OidIsValid(slot_contents.data.database))
517  ereport(ERROR,
518  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519  errmsg("cannot use %s with a logical replication slot",
520  "READ_REPLICATION_SLOT"));
521 
522  /* slot type */
523  values[i] = CStringGetTextDatum("physical");
524  nulls[i] = false;
525  i++;
526 
527  /* start LSN */
528  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529  {
530  char xloc[64];
531 
532  snprintf(xloc, sizeof(xloc), "%X/%X",
533  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
534  values[i] = CStringGetTextDatum(xloc);
535  nulls[i] = false;
536  }
537  i++;
538 
539  /* timeline this WAL was produced on */
540  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541  {
542  TimeLineID slots_position_timeline;
543  TimeLineID current_timeline;
544  List *timeline_history = NIL;
545 
546  /*
547  * While in recovery, use as timeline the currently-replaying one
548  * to get the LSN position's history.
549  */
550  if (RecoveryInProgress())
551  (void) GetXLogReplayRecPtr(&current_timeline);
552  else
553  current_timeline = GetWALInsertionTimeLine();
554 
555  timeline_history = readTimeLineHistory(current_timeline);
556  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
557  timeline_history);
558  values[i] = Int64GetDatum((int64) slots_position_timeline);
559  nulls[i] = false;
560  }
561  i++;
562 
564  }
565 
567  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568  do_tup_output(tstate, values, nulls);
569  end_tup_output(tstate);
570 }
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
#define OidIsValid(objectId)
Definition: c.h:764
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_SHARED
Definition: lwlock.h:117
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:376
Definition: pg_list.h:54
bool in_use
Definition: slot.h:138
#define READ_REPLICATION_SLOT_COLS

References Assert(), begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 577 of file walsender.c.

578 {
580  TupleDesc tupdesc;
582  char histfname[MAXFNAMELEN];
583  char path[MAXPGPATH];
584  int fd;
585  off_t histfilelen;
586  off_t bytesleft;
587  Size len;
588 
590 
591  /*
592  * Reply with a result set with one row, and two columns. The first col is
593  * the name of the history file, 2nd is the contents.
594  */
595  tupdesc = CreateTemplateTupleDesc(2);
596  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 
599  TLHistoryFileName(histfname, cmd->timeline);
600  TLHistoryFilePath(path, cmd->timeline);
601 
602  /* Send a RowDescription message */
603  dest->rStartup(dest, CMD_SELECT, tupdesc);
604 
605  /* Send a DataRow message */
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
637  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
#define PG_BINARY
Definition: c.h:1283
size_t Size
Definition: c.h:594
int errcode_for_file_access(void)
Definition: elog.c:881
int CloseTransientFile(int fd)
Definition: fd.c:2754
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2578
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:276
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:108
char data[BLCKSZ]
Definition: c.h:1132
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1254 of file walsender.c.

1255 {
1257  QueryCompletion qc;
1258 
1259  /* make sure that our requirements are still fulfilled */
1261 
1263 
1264  ReplicationSlotAcquire(cmd->slotname, true);
1265 
1266  /*
1267  * Force a disconnect, so that the decoding code doesn't need to care
1268  * about an eventual switch from running in recovery, to running in a
1269  * normal environment. Client code is expected to handle reconnects.
1270  */
1272  {
1273  ereport(LOG,
1274  (errmsg("terminating walsender process after promotion")));
1275  got_STOPPING = true;
1276  }
1277 
1278  /*
1279  * Create our decoding context, making it start at the previously ack'ed
1280  * position.
1281  *
1282  * Do this before sending a CopyBothResponse message, so that any errors
1283  * are reported early.
1284  */
1286  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1287  XL_ROUTINE(.page_read = logical_read_xlog_page,
1288  .segment_open = WalSndSegmentOpen,
1289  .segment_close = wal_segment_close),
1293 
1295 
1296  /* Send a CopyBothResponse message, and start streaming */
1298  pq_sendbyte(&buf, 0);
1299  pq_sendint16(&buf, 0);
1300  pq_endmessage(&buf);
1301  pq_flush();
1302 
1303  /* Start reading WAL from the oldest required WAL. */
1306 
1307  /*
1308  * Report the location after which we'll send out further commits as the
1309  * current sentPtr.
1310  */
1312 
1313  /* Also update the sent position status in shared memory */
1317 
1318  replication_active = true;
1319 
1321 
1322  /* Main loop of walsender */
1324 
1327 
1328  replication_active = false;
1329  if (got_STOPPING)
1330  proc_exit(0);
1332 
1333  /* Get out of COPY mode (CommandComplete). */
1334  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1335  EndCommand(&qc, DestRemote, false);
1336 }
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:494
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:452
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:85
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2446
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3045
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:248

References am_cascading_walsender, Assert(), buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 670 of file walsender.c.

671 {
673  XLogRecPtr FlushPtr;
674  TimeLineID FlushTLI;
675 
676  /* create xlogreader for physical replication */
677  xlogreader =
679  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680  .segment_close = wal_segment_close),
681  NULL);
682 
683  if (!xlogreader)
684  ereport(ERROR,
685  (errcode(ERRCODE_OUT_OF_MEMORY),
686  errmsg("out of memory"),
687  errdetail("Failed while allocating a WAL reading processor.")));
688 
689  /*
690  * We assume here that we're logging enough information in the WAL for
691  * log-shipping, since this is checked in PostmasterMain().
692  *
693  * NOTE: wal_level can only change at shutdown, so in most cases it is
694  * difficult for there to be WAL data that we can still see that was
695  * written at wal_level='minimal'.
696  */
697 
698  if (cmd->slotname)
699  {
700  ReplicationSlotAcquire(cmd->slotname, true);
702  ereport(ERROR,
703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704  errmsg("cannot use a logical replication slot for physical replication")));
705 
706  /*
707  * We don't need to verify the slot's restart_lsn here; instead we
708  * rely on the caller requesting the starting point to use. If the
709  * WAL segment doesn't exist, we'll fail later.
710  */
711  }
712 
713  /*
714  * Select the timeline. If it was given explicitly by the client, use
715  * that. Otherwise use the timeline of the last replayed record.
716  */
719  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720  else
721  FlushPtr = GetFlushRecPtr(&FlushTLI);
722 
723  if (cmd->timeline != 0)
724  {
725  XLogRecPtr switchpoint;
726 
727  sendTimeLine = cmd->timeline;
728  if (sendTimeLine == FlushTLI)
729  {
730  sendTimeLineIsHistoric = false;
732  }
733  else
734  {
735  List *timeLineHistory;
736 
737  sendTimeLineIsHistoric = true;
738 
739  /*
740  * Check that the timeline the client requested exists, and the
741  * requested start location is on that timeline.
742  */
743  timeLineHistory = readTimeLineHistory(FlushTLI);
744  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
746  list_free_deep(timeLineHistory);
747 
748  /*
749  * Found the requested timeline in the history. Check that
750  * requested startpoint is on that timeline in our history.
751  *
752  * This is quite loose on purpose. We only check that we didn't
753  * fork off the requested timeline before the switchpoint. We
754  * don't check that we switched *to* it before the requested
755  * starting point. This is because the client can legitimately
756  * request to start replication from the beginning of the WAL
757  * segment that contains switchpoint, but on the new timeline, so
758  * that it doesn't end up with a partial segment. If you ask for
759  * too old a starting point, you'll get an error later when we
760  * fail to find the requested WAL segment in pg_wal.
761  *
762  * XXX: we could be more strict here and only allow a startpoint
763  * that's older than the switchpoint, if it's still in the same
764  * WAL segment.
765  */
766  if (!XLogRecPtrIsInvalid(switchpoint) &&
767  switchpoint < cmd->startpoint)
768  {
769  ereport(ERROR,
770  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
772  cmd->timeline),
773  errdetail("This server's history forked from timeline %u at %X/%X.",
774  cmd->timeline,
775  LSN_FORMAT_ARGS(switchpoint))));
776  }
777  sendTimeLineValidUpto = switchpoint;
778  }
779  }
780  else
781  {
782  sendTimeLine = FlushTLI;
784  sendTimeLineIsHistoric = false;
785  }
786 
788 
789  /* If there is nothing to stream, don't even enter COPY mode */
791  {
792  /*
793  * When we first start replication the standby will be behind the
794  * primary. For some applications, for example synchronous
795  * replication, it is important to have a clear state for this initial
796  * catchup mode, so we can trigger actions when we change streaming
797  * state later. We may stay in this state for a long time, which is
798  * exactly why we want to be able to monitor whether or not we are
799  * still here.
800  */
802 
803  /* Send a CopyBothResponse message, and start streaming */
805  pq_sendbyte(&buf, 0);
806  pq_sendint16(&buf, 0);
807  pq_endmessage(&buf);
808  pq_flush();
809 
810  /*
811  * Don't allow a request to stream from a future point in WAL that
812  * hasn't been flushed to disk in this server yet.
813  */
814  if (FlushPtr < cmd->startpoint)
815  {
816  ereport(ERROR,
817  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
819  LSN_FORMAT_ARGS(FlushPtr))));
820  }
821 
822  /* Start streaming from the requested point */
823  sentPtr = cmd->startpoint;
824 
825  /* Initialize shared memory status, too */
829 
831 
832  /* Main loop of walsender */
833  replication_active = true;
834 
836 
837  replication_active = false;
838  if (got_STOPPING)
839  proc_exit(0);
841 
843  }
844 
845  if (cmd->slotname)
847 
848  /*
849  * Copy is finished now. Send a single-row result set indicating the next
850  * timeline.
851  */
853  {
854  char startpos_str[8 + 1 + 8 + 1];
856  TupOutputState *tstate;
857  TupleDesc tupdesc;
858  Datum values[2];
859  bool nulls[2] = {0};
860 
861  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
863 
865 
866  /*
867  * Need a tuple descriptor representing two columns. int8 may seem
868  * like a surprising data type for this, but in theory int4 would not
869  * be wide enough for this, as TimeLineID is unsigned.
870  */
871  tupdesc = CreateTemplateTupleDesc(2);
872  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873  INT8OID, -1, 0);
874  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
875  TEXTOID, -1, 0);
876 
877  /* prepare for projection of tuple */
878  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 
881  values[1] = CStringGetTextDatum(startpos_str);
882 
883  /* send it to dest */
884  do_tup_output(tstate, values, nulls);
885 
886  end_tup_output(tstate);
887  }
888 
889  /* Send CommandComplete message */
890  EndReplicationCommand("START_STREAMING");
891 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1202
void list_free_deep(List *list)
Definition: list.c:1559
TimeLineID timeline
Definition: replnodes.h:84
static void XLogSendPhysical(void)
Definition: walsender.c:2745
int wal_segment_size
Definition: xlog.c:146
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108

References am_cascading_walsender, Assert(), begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2220 of file walsender.c.

2221 {
2222  FullTransactionId nextFullXid;
2223  TransactionId nextXid;
2224  uint32 nextEpoch;
2225 
2226  nextFullXid = ReadNextFullTransactionId();
2227  nextXid = XidFromFullTransactionId(nextFullXid);
2228  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2229 
2230  if (xid <= nextXid)
2231  {
2232  if (epoch != nextEpoch)
2233  return false;
2234  }
2235  else
2236  {
2237  if (epoch + 1 != nextEpoch)
2238  return false;
2239  }
2240 
2241  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2242  return false; /* epoch OK, but it's wrapped around */
2243 
2244  return true;
2245 }
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2419 of file walsender.c.

2420 {
2421  TimestampTz timeout;
2422 
2423  /* don't bail out if we're doing something that doesn't require timeouts */
2424  if (last_reply_timestamp <= 0)
2425  return;
2426 
2429 
2430  if (wal_sender_timeout > 0 && last_processing >= timeout)
2431  {
2432  /*
2433  * Since typically expiration of replication timeout means
2434  * communication problem, we don't send the error message to the
2435  * standby.
2436  */
2438  (errmsg("terminating walsender process due to replication timeout")));
2439 
2440  WalSndShutdown();
2441  }
2442 }
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:124

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2375 of file walsender.c.

2376 {
2377  long sleeptime = 10000; /* 10 s */
2378 
2380  {
2381  TimestampTz wakeup_time;
2382 
2383  /*
2384  * At the latest stop sleeping once wal_sender_timeout has been
2385  * reached.
2386  */
2389 
2390  /*
2391  * If no ping has been sent yet, wakeup when it's time to do so.
2392  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2393  * the timeout passed without a response.
2394  */
2397  wal_sender_timeout / 2);
2398 
2399  /* Compute relative time until wakeup. */
2400  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2401  }
2402 
2403  return sleeptime;
2404 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1695

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3130 of file walsender.c.

3131 {
3132  XLogRecPtr replicatedPtr;
3133 
3134  /* ... let's just be real sure we're caught up ... */
3135  send_data();
3136 
3137  /*
3138  * To figure out whether all WAL has successfully been replicated, check
3139  * flush location if valid, write otherwise. Tools like pg_receivewal will
3140  * usually (unless in synchronous mode) return an invalid flush location.
3141  */
3142  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3144 
3145  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3146  !pq_is_send_pending())
3147  {
3148  QueryCompletion qc;
3149 
3150  /* Inform the standby that XLOG streaming is done */
3151  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3152  EndCommand(&qc, DestRemote, false);
3153  pq_flush();
3154 
3155  proc_exit(0);
3156  }
3159 }
static bool WalSndCaughtUp
Definition: walsender.c:184

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 315 of file walsender.c.

316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1903
void ReplicationSlotCleanup(void)
Definition: slot.c:605
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3489 of file walsender.c.

3490 {
3491  switch (state)
3492  {
3493  case WALSNDSTATE_STARTUP:
3494  return "startup";
3495  case WALSNDSTATE_BACKUP:
3496  return "backup";
3497  case WALSNDSTATE_CATCHUP:
3498  return "catchup";
3499  case WALSNDSTATE_STREAMING:
3500  return "streaming";
3501  case WALSNDSTATE_STOPPING:
3502  return "stopping";
3503  }
3504  return "UNKNOWN";
3505 }
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3406 of file walsender.c.

3407 {
3408  int i;
3409 
3410  for (i = 0; i < max_wal_senders; i++)
3411  {
3412  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3413  pid_t pid;
3414 
3415  SpinLockAcquire(&walsnd->mutex);
3416  pid = walsnd->pid;
3417  SpinLockRelease(&walsnd->mutex);
3418 
3419  if (pid == 0)
3420  continue;
3421 
3423  }
3424 }
#define InvalidBackendId
Definition: backendid.h:23
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 3695 of file walsender.c.

3696 {
3697  elog(DEBUG2, "sending replication keepalive");
3698 
3699  /* construct the message... */
3701  pq_sendbyte(&output_message, 'k');
3702  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3704  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3705 
3706  /* ... and send it wrapped in CopyData */
3708 
3709  /* Set local flag */
3710  if (requestReply)
3712 }
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153

References StringInfoData::data, DEBUG2, elog(), GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3718 of file walsender.c.

3719 {
3720  TimestampTz ping_time;
3721 
3722  /*
3723  * Don't send keepalive messages if timeouts are globally disabled or
3724  * we're doing something not partaking in timeouts.
3725  */
3726  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3727  return;
3728 
3730  return;
3731 
3732  /*
3733  * If half of wal_sender_timeout has lapsed without receiving any reply
3734  * from the standby, send a keep-alive message to the standby requesting
3735  * an immediate reply.
3736  */
3738  wal_sender_timeout / 2);
3739  if (last_processing >= ping_time)
3740  {
3742 
3743  /* Try to flush pending output to the client */
3744  if (pq_flush_if_writable() != 0)
3745  WalSndShutdown();
3746  }
3747 }

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2649 of file walsender.c.

2650 {
2651  WalSnd *walsnd = MyWalSnd;
2652 
2653  Assert(walsnd != NULL);
2654 
2655  MyWalSnd = NULL;
2656 
2657  SpinLockAcquire(&walsnd->mutex);
2658  /* clear latch while holding the spinlock, so it can safely be read */
2659  walsnd->latch = NULL;
2660  /* Mark WalSnd struct as no longer being in use. */
2661  walsnd->pid = 0;
2662  SpinLockRelease(&walsnd->mutex);
2663 }

References Assert(), WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3246 of file walsender.c.

3247 {
3248  int save_errno = errno;
3249 
3250  got_SIGUSR2 = true;
3251  SetLatch(MyLatch);
3252 
3253  errno = save_errno;
3254 }

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2446 of file walsender.c.

2447 {
2448  /*
2449  * Initialize the last reply timestamp. That enables timeout processing
2450  * from hereon.
2451  */
2453  waiting_for_ping_response = false;
2454 
2455  /*
2456  * Loop until we reach the end of this timeline or the client requests to
2457  * stop streaming.
2458  */
2459  for (;;)
2460  {
2461  /* Clear any already-pending wakeups */
2463 
2465 
2466  /* Process any requests or signals received recently */
2467  if (ConfigReloadPending)
2468  {
2469  ConfigReloadPending = false;
2472  }
2473 
2474  /* Check for input from the client */
2476 
2477  /*
2478  * If we have received CopyDone from the client, sent CopyDone
2479  * ourselves, and the output buffer is empty, it's time to exit
2480  * streaming.
2481  */
2483  !pq_is_send_pending())
2484  break;
2485 
2486  /*
2487  * If we don't have any pending data in the output buffer, try to send
2488  * some more. If there is some, we don't bother to call send_data
2489  * again until we've flushed it ... but we'd better assume we are not
2490  * caught up.
2491  */
2492  if (!pq_is_send_pending())
2493  send_data();
2494  else
2495  WalSndCaughtUp = false;
2496 
2497  /* Try to flush pending output to the client */
2498  if (pq_flush_if_writable() != 0)
2499  WalSndShutdown();
2500 
2501  /* If nothing remains to be sent right now ... */
2503  {
2504  /*
2505  * If we're in catchup state, move to streaming. This is an
2506  * important state change for users to know about, since before
2507  * this point data loss might occur if the primary dies and we
2508  * need to failover to the standby. The state change is also
2509  * important for synchronous replication, since commits that
2510  * started to wait at that point might wait for some time.
2511  */
2513  {
2514  ereport(DEBUG1,
2515  (errmsg_internal("\"%s\" has now caught up with upstream server",
2516  application_name)));
2518  }
2519 
2520  /*
2521  * When SIGUSR2 arrives, we send any outstanding logs up to the
2522  * shutdown checkpoint record (i.e., the latest record), wait for
2523  * them to be replicated to the standby, and exit. This may be a
2524  * normal termination at shutdown, or a promotion, the walsender
2525  * is not sure which.
2526  */
2527  if (got_SIGUSR2)
2528  WalSndDone(send_data);
2529  }
2530 
2531  /* Check for replication timeout. */
2533 
2534  /* Send keepalive if the time has come */
2536 
2537  /*
2538  * Block if we have unsent data. XXX For logical replication, let
2539  * WalSndWaitForWal() handle any other blocking; idle receivers need
2540  * its additional actions. For physical replication, also block if
2541  * caught up; its send_data does not block.
2542  */
2543  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2546  {
2547  long sleeptime;
2548  int wakeEvents;
2549 
2551  wakeEvents = WL_SOCKET_READABLE;
2552  else
2553  wakeEvents = 0;
2554 
2555  /*
2556  * Use fresh timestamp, not last_processing, to reduce the chance
2557  * of reaching wal_sender_timeout before sending a keepalive.
2558  */
2560 
2561  if (pq_is_send_pending())
2562  wakeEvents |= WL_SOCKET_WRITEABLE;
2563 
2564  /* Sleep until something happens or we time out */
2565  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2566  }
2567  }
2568 }
char * application_name
Definition: guc_tables.c:541
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3130

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1347 of file walsender.c.

1348 {
1349  /* can't have sync rep confused by sending the same LSN several times */
1350  if (!last_write)
1351  lsn = InvalidXLogRecPtr;
1352 
1353  resetStringInfo(ctx->out);
1354 
1355  pq_sendbyte(ctx->out, 'w');
1356  pq_sendint64(ctx->out, lsn); /* dataStart */
1357  pq_sendint64(ctx->out, lsn); /* walEnd */
1358 
1359  /*
1360  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1361  * reserve space here.
1362  */
1363  pq_sendint64(ctx->out, 0); /* sendtime */
1364 }
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 350 of file walsender.c.

351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:147
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:488
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:762
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3201 of file walsender.c.

3202 {
3203  int i;
3204 
3205  for (i = 0; i < max_wal_senders; i++)
3206  {
3207  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3208 
3209  SpinLockAcquire(&walsnd->mutex);
3210  if (walsnd->pid == 0)
3211  {
3212  SpinLockRelease(&walsnd->mutex);
3213  continue;
3214  }
3215  walsnd->needreload = true;
3216  SpinLockRelease(&walsnd->mutex);
3217  }
3218 }

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2667 of file walsender.c.

2669 {
2670  char path[MAXPGPATH];
2671 
2672  /*-------
2673  * When reading from a historic timeline, and there is a timeline switch
2674  * within this segment, read from the WAL segment belonging to the new
2675  * timeline.
2676  *
2677  * For example, imagine that this server is currently on timeline 5, and
2678  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2679  * 0/13002088. In pg_wal, we have these files:
2680  *
2681  * ...
2682  * 000000040000000000000012
2683  * 000000040000000000000013
2684  * 000000050000000000000013
2685  * 000000050000000000000014
2686  * ...
2687  *
2688  * In this situation, when requested to send the WAL from segment 0x13, on
2689  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2690  * recovery prefers files from newer timelines, so if the segment was
2691  * restored from the archive on this server, the file belonging to the old
2692  * timeline, 000000040000000000000013, might not exist. Their contents are
2693  * equal up to the switchpoint, because at a timeline switch, the used
2694  * portion of the old segment is copied to the new file.
2695  */
2696  *tli_p = sendTimeLine;
2698  {
2699  XLogSegNo endSegNo;
2700 
2701  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2702  if (nextSegNo == endSegNo)
2703  *tli_p = sendTimeLineNextTLI;
2704  }
2705 
2706  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2707  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2708  if (state->seg.ws_file >= 0)
2709  return;
2710 
2711  /*
2712  * If the file is not found, assume it's because the standby asked for a
2713  * too old WAL segment that has already been removed or recycled.
2714  */
2715  if (errno == ENOENT)
2716  {
2717  char xlogfname[MAXFNAMELEN];
2718  int save_errno = errno;
2719 
2720  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2721  errno = save_errno;
2722  ereport(ERROR,
2724  errmsg("requested WAL segment %s has already been removed",
2725  xlogfname)));
2726  }
2727  else
2728  ereport(ERROR,
2730  errmsg("could not open file \"%s\": %m",
2731  path)));
2732 }
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1039
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3470 of file walsender.c.

3471 {
3472  WalSnd *walsnd = MyWalSnd;
3473 
3475 
3476  if (walsnd->state == state)
3477  return;
3478 
3479  SpinLockAcquire(&walsnd->mutex);
3480  walsnd->state = state;
3481  SpinLockRelease(&walsnd->mutex);
3482 }

References am_walsender, Assert(), WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3289 of file walsender.c.

3290 {
3291  bool found;
3292  int i;
3293 
3294  WalSndCtl = (WalSndCtlData *)
3295  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3296 
3297  if (!found)
3298  {
3299  /* First time through, so initialize */
3301 
3302  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3304 
3305  for (i = 0; i < max_wal_senders; i++)
3306  {
3307  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3308 
3309  SpinLockInit(&walsnd->mutex);
3310  }
3311 
3314  }
3315 }
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
#define SpinLockInit(lock)
Definition: spin.h:60
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3277

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3277 of file walsender.c.

3278 {
3279  Size size = 0;
3280 
3281  size = offsetof(WalSndCtlData, walsnds);
3282  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3283 
3284  return size;
3285 }
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 230 of file walsender.c.

267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
@ LW_EXCLUSIVE
Definition: lwlock.h:116
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
PROC_HDR * ProcGlobal
Definition: proc.c:78
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
uint8 * statusFlags
Definition: proc.h:377
static void InitWalSenderSlot(void)
Definition: walsender.c:2572

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3258 of file walsender.c.

3259 {
3260  /* Set up signal handlers */
3262  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3263  pqsignal(SIGTERM, die); /* request shutdown */
3264  /* SIGQUIT handler was already set up by InitPostmasterChild */
3265  InitializeTimeouts(); /* establishes SIGALRM handler */
3268  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3269  * shutdown */
3270 
3271  /* Reset some signals that are accepted by postmaster but not here */
3273 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3008
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
void InitializeTimeouts(void)
Definition: timeout.c:474
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3246
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1470 of file walsender.c.

1472 {
1473  static TimestampTz sendTime = 0;
1475  bool pending_writes = false;
1476  bool end_xact = ctx->end_xact;
1477 
1478  /*
1479  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1480  * avoid flooding the lag tracker when we commit frequently.
1481  *
1482  * We don't have a mechanism to get the ack for any LSN other than end
1483  * xact LSN from the downstream. So, we track lag only for end of
1484  * transaction LSN.
1485  */
1486 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1487  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1489  {
1490  LagTrackerWrite(lsn, now);
1491  sendTime = now;
1492  }
1493 
1494  /*
1495  * When skipping empty transactions in synchronous replication, we send a
1496  * keepalive message to avoid delaying such transactions.
1497  *
1498  * It is okay to check sync_standbys_defined flag without lock here as in
1499  * the worst case we will just send an extra keepalive message when it is
1500  * really not required.
1501  */
1502  if (skipped_xact &&
1503  SyncRepRequested() &&
1504  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1505  {
1506  WalSndKeepalive(false, lsn);
1507 
1508  /* Try to flush pending output to the client */
1509  if (pq_flush_if_writable() != 0)
1510  WalSndShutdown();
1511 
1512  /* If we have pending write here, make sure it's actually flushed */
1513  if (pq_is_send_pending())
1514  pending_writes = true;
1515  }
1516 
1517  /*
1518  * Process pending writes if any or try to send a keepalive if required.
1519  * We don't need to try sending keep alive messages at the transaction end
1520  * as that will be done at a later point in time. This is required only
1521  * for large transactions where we don't send any changes to the
1522  * downstream and the receiver can timeout due to that.
1523  */
1524  if (pending_writes || (!end_xact &&
1526  wal_sender_timeout / 2)))
1528 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1416
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3756

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3354 of file walsender.c.

3355 {
3356  WaitEvent event;
3357 
3358  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3359 
3360  /*
3361  * We use a condition variable to efficiently wake up walsenders in
3362  * WalSndWakeup().
3363  *
3364  * Every walsender prepares to sleep on a shared memory CV. Note that it
3365  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3366  * waitlist), but does not actually wait on the CV (IOW, it never calls
3367  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3368  * waiting, because we also need to wait for socket events. The processes
3369  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3370  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3371  * walsenders come out of WaitEventSetWait().
3372  *
3373  * This approach is simple and efficient because, one doesn't have to loop
3374  * through all the walsenders slots, with a spinlock acquisition and
3375  * release for every iteration, just to wake up only the waiting
3376  * walsenders. It makes WalSndWakeup() callers' life easy.
3377  *
3378  * XXX: A desirable future improvement would be to add support for CVs
3379  * into WaitEventSetWait().
3380  *
3381  * And, we use separate shared memory CVs for physical and logical
3382  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3383  */
3386  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3388 
3389  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3390  (event.events & WL_POSTMASTER_DEATH))
3391  {
3393  proc_exit(1);
3394  }
3395 
3397 }
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1006
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1381
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
uint32 events
Definition: latch.h:153

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1538 of file walsender.c.

1539 {
1540  int wakeEvents;
1541  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1542 
1543  /*
1544  * Fast path to avoid acquiring the spinlock in case we already know we
1545  * have enough WAL available. This is particularly interesting if we're
1546  * far behind.
1547  */
1548  if (RecentFlushPtr != InvalidXLogRecPtr &&
1549  loc <= RecentFlushPtr)
1550  return RecentFlushPtr;
1551 
1552  /* Get a more recent flush pointer. */
1553  if (!RecoveryInProgress())
1554  RecentFlushPtr = GetFlushRecPtr(NULL);
1555  else
1556  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1557 
1558  for (;;)
1559  {
1560  long sleeptime;
1561 
1562  /* Clear any already-pending wakeups */
1564 
1566 
1567  /* Process any requests or signals received recently */
1568  if (ConfigReloadPending)
1569  {
1570  ConfigReloadPending = false;
1573  }
1574 
1575  /* Check for input from the client */
1577 
1578  /*
1579  * If we're shutting down, trigger pending WAL to be written out,
1580  * otherwise we'd possibly end up waiting for WAL that never gets
1581  * written, because walwriter has shut down already.
1582  */
1583  if (got_STOPPING)
1585 
1586  /* Update our idea of the currently flushed position. */
1587  if (!RecoveryInProgress())
1588  RecentFlushPtr = GetFlushRecPtr(NULL);
1589  else
1590  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1591 
1592  /*
1593  * If postmaster asked us to stop, don't wait anymore.
1594  *
1595  * It's important to do this check after the recomputation of
1596  * RecentFlushPtr, so we can send all remaining data before shutting
1597  * down.
1598  */
1599  if (got_STOPPING)
1600  break;
1601 
1602  /*
1603  * We only send regular messages to the client for full decoded
1604  * transactions, but a synchronous replication and walsender shutdown
1605  * possibly are waiting for a later location. So, before sleeping, we
1606  * send a ping containing the flush location. If the receiver is
1607  * otherwise idle, this keepalive will trigger a reply. Processing the
1608  * reply will update these MyWalSnd locations.
1609  */
1610  if (MyWalSnd->flush < sentPtr &&
1611  MyWalSnd->write < sentPtr &&
1614 
1615  /* check whether we're done */
1616  if (loc <= RecentFlushPtr)
1617  break;
1618 
1619  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1620  WalSndCaughtUp = true;
1621 
1622  /*
1623  * Try to flush any pending output to the client.
1624  */
1625  if (pq_flush_if_writable() != 0)
1626  WalSndShutdown();
1627 
1628  /*
1629  * If we have received CopyDone from the client, sent CopyDone
1630  * ourselves, and the output buffer is empty, it's time to exit
1631  * streaming, so fail the current WAL fetch request.
1632  */
1634  !pq_is_send_pending())
1635  break;
1636 
1637  /* die if timeout was reached */
1639 
1640  /* Send keepalive if the time has come */
1642 
1643  /*
1644  * Sleep until something happens or we time out. Also wait for the
1645  * socket becoming writable, if there's still pending output.
1646  * Otherwise we might sit on sendable output data while waiting for
1647  * new WAL to be generated. (But if we have nothing to send, we don't
1648  * want to wake on socket-writable.)
1649  */
1651 
1652  wakeEvents = WL_SOCKET_READABLE;
1653 
1654  if (pq_is_send_pending())
1655  wakeEvents |= WL_SOCKET_WRITEABLE;
1656 
1657  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
1658  }
1659 
1660  /* reactivate latch so WalSndLoop knows to continue */
1661  SetLatch(MyLatch);
1662  return RecentFlushPtr;
1663 }
bool XLogBackgroundFlush(void)
Definition: xlog.c:2725

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3432 of file walsender.c.

3433 {
3434  for (;;)
3435  {
3436  int i;
3437  bool all_stopped = true;
3438 
3439  for (i = 0; i < max_wal_senders; i++)
3440  {
3441  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3442 
3443  SpinLockAcquire(&walsnd->mutex);
3444 
3445  if (walsnd->pid == 0)
3446  {
3447  SpinLockRelease(&walsnd->mutex);
3448  continue;
3449  }
3450 
3451  if (walsnd->state != WALSNDSTATE_STOPPING)
3452  {
3453  all_stopped = false;
3454  SpinLockRelease(&walsnd->mutex);
3455  break;
3456  }
3457  SpinLockRelease(&walsnd->mutex);
3458  }
3459 
3460  /* safe to leave if confirmation is done for all WAL senders */
3461  if (all_stopped)
3462  return;
3463 
3464  pg_usleep(10000L); /* wait for 10 msec */
3465  }
3466 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3333 of file walsender.c.

3334 {
3335  /*
3336  * Wake up all the walsenders waiting on WAL being flushed or replayed
3337  * respectively. Note that waiting walsender would have prepared to sleep
3338  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3339  * before actually waiting.
3340  */
3341  if (physical)
3343 
3344  if (logical)
3346 }
void ConditionVariableBroadcast(ConditionVariable *cv)

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1374 of file walsender.c.

1376 {
1377  TimestampTz now;
1378 
1379  /*
1380  * Fill the send timestamp last, so that it is taken as late as possible.
1381  * This is somewhat ugly, but the protocol is set as it's already used for
1382  * several releases by streaming physical replication.
1383  */
1386  pq_sendint64(&tmpbuf, now);
1387  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1388  tmpbuf.data, sizeof(int64));
1389 
1390  /* output previously gathered data in a CopyData packet */
1391  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1392 
1394 
1395  /* Try to flush pending output to the client */
1396  if (pq_flush_if_writable() != 0)
1397  WalSndShutdown();
1398 
1399  /* Try taking fast path unless we get too close to walsender timeout. */
1401  wal_sender_timeout / 2) &&
1402  !pq_is_send_pending())
1403  {
1404  return;
1405  }
1406 
1407  /* If we have pending write here, go to slow path */
1409 }

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3045 of file walsender.c.

3046 {
3047  XLogRecord *record;
3048  char *errm;
3049 
3050  /*
3051  * We'll use the current flush point to determine whether we've caught up.
3052  * This variable is static in order to cache it across calls. Caching is
3053  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3054  * spinlock.
3055  */
3056  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3057 
3058  /*
3059  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3060  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3061  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3062  * didn't wait - i.e. when we're shutting down.
3063  */
3064  WalSndCaughtUp = false;
3065 
3066  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3067 
3068  /* xlog record was invalid */
3069  if (errm != NULL)
3070  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3071  errm);
3072 
3073  if (record != NULL)
3074  {
3075  /*
3076  * Note the lack of any call to LagTrackerWrite() which is handled by
3077  * WalSndUpdateProgress which is called by output plugin through
3078  * logical decoding write api.
3079  */
3081 
3083  }
3084 
3085  /*
3086  * If first time through in this session, initialize flushPtr. Otherwise,
3087  * we only need to update flushPtr if EndRecPtr is past it.
3088  */
3089  if (flushPtr == InvalidXLogRecPtr ||
3090  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3091  {
3093  flushPtr = GetStandbyFlushRecPtr(NULL);
3094  else
3095  flushPtr = GetFlushRecPtr(NULL);
3096  }
3097 
3098  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3099  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3100  WalSndCaughtUp = true;
3101 
3102  /*
3103  * If we're caught up and have been requested to stop, have WalSndLoop()
3104  * terminate the connection in an orderly manner, after writing out all
3105  * the pending data.
3106  */
3108  got_SIGUSR2 = true;
3109 
3110  /* Update shared memory status */
3111  {
3112  WalSnd *walsnd = MyWalSnd;
3113 
3114  SpinLockAcquire(&walsnd->mutex);
3115  walsnd->sentPtr = sentPtr;
3116  SpinLockRelease(&walsnd->mutex);
3117  }
3118 }
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:406

References am_cascading_walsender, elog(), XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2745 of file walsender.c.

2746 {
2747  XLogRecPtr SendRqstPtr;
2748  XLogRecPtr startptr;
2749  XLogRecPtr endptr;
2750  Size nbytes;
2751  XLogSegNo segno;
2752  WALReadError errinfo;
2753 
2754  /* If requested switch the WAL sender to the stopping state. */
2755  if (got_STOPPING)
2757 
2759  {
2760  WalSndCaughtUp = true;
2761  return;
2762  }
2763 
2764  /* Figure out how far we can safely send the WAL. */
2766  {
2767  /*
2768  * Streaming an old timeline that's in this server's history, but is
2769  * not the one we're currently inserting or replaying. It can be
2770  * streamed up to the point where we switched off that timeline.
2771  */
2772  SendRqstPtr = sendTimeLineValidUpto;
2773  }
2774  else if (am_cascading_walsender)
2775  {
2776  TimeLineID SendRqstTLI;
2777 
2778  /*
2779  * Streaming the latest timeline on a standby.
2780  *
2781  * Attempt to send all WAL that has already been replayed, so that we
2782  * know it's valid. If we're receiving WAL through streaming
2783  * replication, it's also OK to send any WAL that has been received
2784  * but not replayed.
2785  *
2786  * The timeline we're recovering from can change, or we can be
2787  * promoted. In either case, the current timeline becomes historic. We
2788  * need to detect that so that we don't try to stream past the point
2789  * where we switched to another timeline. We check for promotion or
2790  * timeline switch after calculating FlushPtr, to avoid a race
2791  * condition: if the timeline becomes historic just after we checked
2792  * that it was still current, it's still be OK to stream it up to the
2793  * FlushPtr that was calculated before it became historic.
2794  */
2795  bool becameHistoric = false;
2796 
2797  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2798 
2799  if (!RecoveryInProgress())
2800  {
2801  /* We have been promoted. */
2802  SendRqstTLI = GetWALInsertionTimeLine();
2803  am_cascading_walsender = false;
2804  becameHistoric = true;
2805  }
2806  else
2807  {
2808  /*
2809  * Still a cascading standby. But is the timeline we're sending
2810  * still the one recovery is recovering from?
2811  */
2812  if (sendTimeLine != SendRqstTLI)
2813  becameHistoric = true;
2814  }
2815 
2816  if (becameHistoric)
2817  {
2818  /*
2819  * The timeline we were sending has become historic. Read the
2820  * timeline history file of the new timeline to see where exactly
2821  * we forked off from the timeline we were sending.
2822  */
2823  List *history;
2824 
2825  history = readTimeLineHistory(SendRqstTLI);
2827 
2829  list_free_deep(history);
2830 
2831  sendTimeLineIsHistoric = true;
2832 
2833  SendRqstPtr = sendTimeLineValidUpto;
2834  }
2835  }
2836  else
2837  {
2838  /*
2839  * Streaming the current timeline on a primary.
2840  *
2841  * Attempt to send all data that's already been written out and
2842  * fsync'd to disk. We cannot go further than what's been written out
2843  * given the current implementation of WALRead(). And in any case
2844  * it's unsafe to send WAL that is not securely down to disk on the
2845  * primary: if the primary subsequently crashes and restarts, standbys
2846  * must not have applied any WAL that got lost on the primary.
2847  */
2848  SendRqstPtr = GetFlushRecPtr(NULL);
2849  }
2850 
2851  /*
2852  * Record the current system time as an approximation of the time at which
2853  * this WAL location was written for the purposes of lag tracking.
2854  *
2855  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2856  * is flushed and we could get that time as well as the LSN when we call
2857  * GetFlushRecPtr() above (and likewise for the cascading standby
2858  * equivalent), but rather than putting any new code into the hot WAL path
2859  * it seems good enough to capture the time here. We should reach this
2860  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2861  * may take some time, we read the WAL flush pointer and take the time
2862  * very close to together here so that we'll get a later position if it is
2863  * still moving.
2864  *
2865  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2866  * this gives us a cheap approximation for the WAL flush time for this
2867  * LSN.
2868  *
2869  * Note that the LSN is not necessarily the LSN for the data contained in
2870  * the present message; it's the end of the WAL, which might be further
2871  * ahead. All the lag tracking machinery cares about is finding out when
2872  * that arbitrary LSN is eventually reported as written, flushed and
2873  * applied, so that it can measure the elapsed time.
2874  */
2875  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2876 
2877  /*
2878  * If this is a historic timeline and we've reached the point where we
2879  * forked to the next timeline, stop streaming.
2880  *
2881  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2882  * startup process will normally replay all WAL that has been received
2883  * from the primary, before promoting, but if the WAL streaming is
2884  * terminated at a WAL page boundary, the valid portion of the timeline
2885  * might end in the middle of a WAL record. We might've already sent the
2886  * first half of that partial WAL record to the cascading standby, so that
2887  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2888  * replay the partial WAL record either, so it can still follow our
2889  * timeline switch.
2890  */
2892  {
2893  /* close the current file. */
2894  if (xlogreader->seg.ws_file >= 0)
2896 
2897  /* Send CopyDone */
2898  pq_putmessage_noblock('c', NULL, 0);
2899  streamingDoneSending = true;
2900 
2901  WalSndCaughtUp = true;
2902 
2903  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2906  return;
2907  }
2908 
2909  /* Do we have any work to do? */
2910  Assert(sentPtr <= SendRqstPtr);
2911  if (SendRqstPtr <= sentPtr)
2912  {
2913  WalSndCaughtUp = true;
2914  return;
2915  }
2916 
2917  /*
2918  * Figure out how much to send in one message. If there's no more than
2919  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2920  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2921  *
2922  * The rounding is not only for performance reasons. Walreceiver relies on
2923  * the fact that we never split a WAL record across two messages. Since a
2924  * long WAL record is split at page boundary into continuation records,
2925  * page boundary is always a safe cut-off point. We also assume that
2926  * SendRqstPtr never points to the middle of a WAL record.
2927  */
2928  startptr = sentPtr;
2929  endptr = startptr;
2930  endptr += MAX_SEND_SIZE;
2931 
2932  /* if we went beyond SendRqstPtr, back off */
2933  if (SendRqstPtr <= endptr)
2934  {
2935  endptr = SendRqstPtr;
2937  WalSndCaughtUp = false;
2938  else
2939  WalSndCaughtUp = true;
2940  }
2941  else
2942  {
2943  /* round down to page boundary. */
2944  endptr -= (endptr % XLOG_BLCKSZ);
2945  WalSndCaughtUp = false;
2946  }
2947 
2948  nbytes = endptr - startptr;
2949  Assert(nbytes <= MAX_SEND_SIZE);
2950 
2951  /*
2952  * OK to read and send the slice.
2953  */
2955  pq_sendbyte(&output_message, 'w');
2956 
2957  pq_sendint64(&output_message, startptr); /* dataStart */
2958  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2959  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2960 
2961  /*
2962  * Read the log directly into the output buffer to avoid extra memcpy
2963  * calls.
2964  */
2966 
2967 retry:
2968  if (!WALRead(xlogreader,
2970  startptr,
2971  nbytes,
2972  xlogreader->seg.ws_tli, /* Pass the current TLI because
2973  * only WalSndSegmentOpen controls
2974  * whether new TLI is needed. */
2975  &errinfo))
2976  WALReadRaiseError(&errinfo);
2977 
2978  /* See logical_read_xlog_page(). */
2979  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2981 
2982  /*
2983  * During recovery, the currently-open WAL file might be replaced with the
2984  * file of the same name retrieved from archive. So we always need to
2985  * check what we read was valid after reading into the buffer. If it's
2986  * invalid, we try to open and read the file again.
2987  */
2989  {
2990  WalSnd *walsnd = MyWalSnd;
2991  bool reload;
2992 
2993  SpinLockAcquire(&walsnd->mutex);
2994  reload = walsnd->needreload;
2995  walsnd->needreload = false;
2996  SpinLockRelease(&walsnd->mutex);
2997 
2998  if (reload && xlogreader->seg.ws_file >= 0)
2999  {
3001 
3002  goto retry;
3003  }
3004  }
3005 
3006  output_message.len += nbytes;
3008 
3009  /*
3010  * Fill the send timestamp last, so that it is taken as late as possible.
3011  */
3014  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3015  tmpbuf.data, sizeof(int64));
3016 
3018 
3019  sentPtr = endptr;
3020 
3021  /* Update shared memory status */
3022  {
3023  WalSnd *walsnd = MyWalSnd;
3024 
3025  SpinLockAcquire(&walsnd->mutex);
3026  walsnd->sentPtr = sentPtr;
3027  SpinLockRelease(&walsnd->mutex);
3028  }
3029 
3030  /* Report progress of XLOG streaming in PS display */
3032  {
3033  char activitymsg[50];
3034 
3035  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3037  set_ps_display(activitymsg);
3038  }
3039 }
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:107

References am_cascading_walsender, Assert(), CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog(), enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 119 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 220 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

Definition at line 126 of file walsender.c.

Referenced by exec_replication_command().

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 198 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 158 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 181 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 131 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader