PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

Referenced by ReadReplicationSlot().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1027 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

1028 {
1029  const char *snapshot_name = NULL;
1030  char xloc[MAXFNAMELEN];
1031  char *slot_name;
1032  bool reserve_wal = false;
1033  bool two_phase = false;
1034  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1035  DestReceiver *dest;
1036  TupOutputState *tstate;
1037  TupleDesc tupdesc;
1038  Datum values[4];
1039  bool nulls[4];
1040 
1042 
1043  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1044 
1045  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1046  {
1047  ReplicationSlotCreate(cmd->slotname, false,
1049  false);
1050  }
1051  else
1052  {
1054 
1055  /*
1056  * Initially create persistent slot as ephemeral - that allows us to
1057  * nicely handle errors during initialization because it'll get
1058  * dropped if this transaction fails. We'll make it persistent at the
1059  * end. Temporary slots can be created as temporary from beginning as
1060  * they get dropped on error as well.
1061  */
1062  ReplicationSlotCreate(cmd->slotname, true,
1064  two_phase);
1065  }
1066 
1067  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1068  {
1070  bool need_full_snapshot = false;
1071 
1072  /*
1073  * Do options check early so that we can bail before calling the
1074  * DecodingContextFindStartpoint which can take long time.
1075  */
1076  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1077  {
1078  if (IsTransactionBlock())
1079  ereport(ERROR,
1080  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1081  (errmsg("%s must not be called inside a transaction",
1082  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1083 
1084  need_full_snapshot = true;
1085  }
1086  else if (snapshot_action == CRS_USE_SNAPSHOT)
1087  {
1088  if (!IsTransactionBlock())
1089  ereport(ERROR,
1090  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1091  (errmsg("%s must be called inside a transaction",
1092  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1093 
1095  ereport(ERROR,
1096  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1097  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1098  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1099 
1100  if (FirstSnapshotSet)
1101  ereport(ERROR,
1102  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1103  (errmsg("%s must be called before any query",
1104  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1105 
1106  if (IsSubTransaction())
1107  ereport(ERROR,
1108  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1109  (errmsg("%s must not be called in a subtransaction",
1110  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1111 
1112  need_full_snapshot = true;
1113  }
1114 
1115  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1117  XL_ROUTINE(.page_read = logical_read_xlog_page,
1118  .segment_open = WalSndSegmentOpen,
1119  .segment_close = wal_segment_close),
1122 
1123  /*
1124  * Signal that we don't need the timeout mechanism. We're just
1125  * creating the replication slot and don't yet accept feedback
1126  * messages or send keepalives. As we possibly need to wait for
1127  * further WAL the walsender would otherwise possibly be killed too
1128  * soon.
1129  */
1131 
1132  /* build initial snapshot, might take a while */
1134 
1135  /*
1136  * Export or use the snapshot if we've been asked to do so.
1137  *
1138  * NB. We will convert the snapbuild.c kind of snapshot to normal
1139  * snapshot when doing this.
1140  */
1141  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1142  {
1143  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1144  }
1145  else if (snapshot_action == CRS_USE_SNAPSHOT)
1146  {
1147  Snapshot snap;
1148 
1151  }
1152 
1153  /* don't need the decoding context anymore */
1154  FreeDecodingContext(ctx);
1155 
1156  if (!cmd->temporary)
1158  }
1159  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1160  {
1162 
1164 
1165  /* Write this slot to disk if it's a permanent one. */
1166  if (!cmd->temporary)
1168  }
1169 
1170  snprintf(xloc, sizeof(xloc), "%X/%X",
1172 
1174  MemSet(nulls, false, sizeof(nulls));
1175 
1176  /*----------
1177  * Need a tuple descriptor representing four columns:
1178  * - first field: the slot name
1179  * - second field: LSN at which we became consistent
1180  * - third field: exported snapshot's name
1181  * - fourth field: output plugin
1182  *----------
1183  */
1184  tupdesc = CreateTemplateTupleDesc(4);
1185  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1186  TEXTOID, -1, 0);
1187  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1188  TEXTOID, -1, 0);
1189  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1190  TEXTOID, -1, 0);
1191  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1192  TEXTOID, -1, 0);
1193 
1194  /* prepare for projection of tuples */
1195  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1196 
1197  /* slot_name */
1198  slot_name = NameStr(MyReplicationSlot->data.name);
1199  values[0] = CStringGetTextDatum(slot_name);
1200 
1201  /* consistent wal location */
1202  values[1] = CStringGetTextDatum(xloc);
1203 
1204  /* snapshot name, or NULL if none */
1205  if (snapshot_name != NULL)
1206  values[2] = CStringGetTextDatum(snapshot_name);
1207  else
1208  nulls[2] = true;
1209 
1210  /* plugin, or NULL if none */
1211  if (cmd->plugin != NULL)
1212  values[3] = CStringGetTextDatum(cmd->plugin);
1213  else
1214  nulls[3] = true;
1215 
1216  /* send it to dest */
1217  do_tup_output(tstate, values, nulls);
1218  end_tup_output(tstate);
1219 
1221 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
#define MemSet(start, val, len)
Definition: c.h:1008
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:226
void ReplicationSlotSave(void)
Definition: slot.c:710
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2241
ReplicationSlotPersistentData data
Definition: slot.h:147
XLogRecPtr confirmed_flush
Definition: slot.h:84
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4703
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotReserveWal(void)
Definition: slot.c:1081
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:589
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
bool FirstSnapshotSet
Definition: snapmgr.c:149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
void ReplicationSlotPersist(void)
Definition: slot.c:745
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1364
void ReplicationSlotRelease(void)
Definition: slot.c:469
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1337
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
static bool two_phase
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2577
#define ereport(elevel,...)
Definition: elog.h:157
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
int XactIsoLevel
Definition: xact.c:76
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:959
bool IsSubTransaction(void)
Definition: xact.c:4776
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
static Datum values[MAXATTR]
Definition: bootstrap.c:156
int errmsg(const char *fmt,...)
Definition: elog.c:909
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:681
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:905
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:217
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:318
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1448

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1227 of file walsender.c.

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1228 {
1229  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1230 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:563

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1608 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1609 {
1610  int parse_rc;
1611  Node *cmd_node;
1612  const char *cmdtag;
1613  MemoryContext cmd_context;
1614  MemoryContext old_context;
1615 
1616  /*
1617  * If WAL sender has been told that shutdown is getting close, switch its
1618  * status accordingly to handle the next replication commands correctly.
1619  */
1620  if (got_STOPPING)
1622 
1623  /*
1624  * Throw error if in stopping mode. We need prevent commands that could
1625  * generate WAL while the shutdown checkpoint is being written. To be
1626  * safe, we just prohibit all new commands.
1627  */
1629  ereport(ERROR,
1630  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1631 
1632  /*
1633  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1634  * command arrives. Clean up the old stuff if there's anything.
1635  */
1637 
1639 
1640  /*
1641  * Parse the command.
1642  */
1644  "Replication command context",
1646  old_context = MemoryContextSwitchTo(cmd_context);
1647 
1648  replication_scanner_init(cmd_string);
1649  parse_rc = replication_yyparse();
1650  if (parse_rc != 0)
1651  ereport(ERROR,
1652  (errcode(ERRCODE_SYNTAX_ERROR),
1653  errmsg_internal("replication command parser returned %d",
1654  parse_rc)));
1656 
1657  cmd_node = replication_parse_result;
1658 
1659  /*
1660  * If it's a SQL command, just clean up our mess and return false; the
1661  * caller will take care of executing it.
1662  */
1663  if (IsA(cmd_node, SQLCmd))
1664  {
1665  if (MyDatabaseId == InvalidOid)
1666  ereport(ERROR,
1667  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1668 
1669  MemoryContextSwitchTo(old_context);
1670  MemoryContextDelete(cmd_context);
1671 
1672  /* Tell the caller that this wasn't a WalSender command. */
1673  return false;
1674  }
1675 
1676  /*
1677  * Report query to various monitoring facilities. For this purpose, we
1678  * report replication commands just like SQL commands.
1679  */
1680  debug_query_string = cmd_string;
1681 
1683 
1684  /*
1685  * Log replication command if log_replication_commands is enabled. Even
1686  * when it's disabled, log the command with DEBUG1 level for backward
1687  * compatibility.
1688  */
1690  (errmsg("received replication command: %s", cmd_string)));
1691 
1692  /*
1693  * Disallow replication commands in aborted transaction blocks.
1694  */
1696  ereport(ERROR,
1697  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1698  errmsg("current transaction is aborted, "
1699  "commands ignored until end of transaction block")));
1700 
1702 
1703  /*
1704  * Allocate buffers that will be used for each outgoing and incoming
1705  * message. We do this just once per command to reduce palloc overhead.
1706  */
1710 
1711  switch (cmd_node->type)
1712  {
1713  case T_IdentifySystemCmd:
1714  cmdtag = "IDENTIFY_SYSTEM";
1715  set_ps_display(cmdtag);
1716  IdentifySystem();
1717  EndReplicationCommand(cmdtag);
1718  break;
1719 
1721  cmdtag = "READ_REPLICATION_SLOT";
1722  set_ps_display(cmdtag);
1724  EndReplicationCommand(cmdtag);
1725  break;
1726 
1727  case T_BaseBackupCmd:
1728  cmdtag = "BASE_BACKUP";
1729  set_ps_display(cmdtag);
1730  PreventInTransactionBlock(true, cmdtag);
1731  SendBaseBackup((BaseBackupCmd *) cmd_node);
1732  EndReplicationCommand(cmdtag);
1733  break;
1734 
1736  cmdtag = "CREATE_REPLICATION_SLOT";
1737  set_ps_display(cmdtag);
1739  EndReplicationCommand(cmdtag);
1740  break;
1741 
1743  cmdtag = "DROP_REPLICATION_SLOT";
1744  set_ps_display(cmdtag);
1746  EndReplicationCommand(cmdtag);
1747  break;
1748 
1749  case T_StartReplicationCmd:
1750  {
1751  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1752 
1753  cmdtag = "START_REPLICATION";
1754  set_ps_display(cmdtag);
1755  PreventInTransactionBlock(true, cmdtag);
1756 
1757  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1758  StartReplication(cmd);
1759  else
1761 
1762  /* dupe, but necessary per libpqrcv_endstreaming */
1763  EndReplicationCommand(cmdtag);
1764 
1765  Assert(xlogreader != NULL);
1766  break;
1767  }
1768 
1769  case T_TimeLineHistoryCmd:
1770  cmdtag = "TIMELINE_HISTORY";
1771  set_ps_display(cmdtag);
1772  PreventInTransactionBlock(true, cmdtag);
1774  EndReplicationCommand(cmdtag);
1775  break;
1776 
1777  case T_VariableShowStmt:
1778  {
1780  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1781 
1782  cmdtag = "SHOW";
1783  set_ps_display(cmdtag);
1784 
1785  /* syscache access needs a transaction environment */
1787  GetPGVariable(n->name, dest);
1789  EndReplicationCommand(cmdtag);
1790  }
1791  break;
1792 
1793  default:
1794  elog(ERROR, "unrecognized replication command node tag: %u",
1795  cmd_node->type);
1796  }
1797 
1798  /* done */
1799  MemoryContextSwitchTo(old_context);
1800  MemoryContextDelete(cmd_context);
1801 
1802  /*
1803  * We need not update ps display or pg_stat_activity, because PostgresMain
1804  * will reset those to "idle". But we must reset debug_query_string to
1805  * ensure it doesn't become a dangling pointer.
1806  */
1807  debug_query_string = NULL;
1808 
1809  return true;
1810 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:589
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:564
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1227
void CommitTransactionCommand(void)
Definition: xact.c:2959
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:392
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:538
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9352
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:46
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:944
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
WalSndState state
NodeTag type
Definition: nodes.h:540
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3399
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1237
Oid MyDatabaseId
Definition: globals.c:88
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:463
void StartTransactionCommand(void)
Definition: xact.c:2858
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1027
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define elog(elevel,...)
Definition: elog.h:232
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void IdentifySystem(void)
Definition: walsender.c:377
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 3076 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

3077 {
3078  XLogRecPtr replayPtr;
3079  TimeLineID replayTLI;
3080  XLogRecPtr receivePtr;
3082  XLogRecPtr result;
3083 
3084  /*
3085  * We can safely send what's already been replayed. Also, if walreceiver
3086  * is streaming WAL from the same timeline, we can send anything that it
3087  * has streamed, but hasn't been replayed yet.
3088  */
3089 
3090  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3091  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3092 
3093  ThisTimeLineID = replayTLI;
3094 
3095  result = replayPtr;
3096  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
3097  result = receivePtr;
3098 
3099  return result;
3100 }
uint32 TimeLineID
Definition: xlogdefs.h:59
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11942
static TimeLineID receiveTLI
Definition: xlog.c:201
TimeLineID ThisTimeLineID
Definition: xlog.c:195
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3129 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3130 {
3132 
3133  /*
3134  * If replication has not yet started, die like with SIGTERM. If
3135  * replication is active, only set a flag and wake up the main loop. It
3136  * will send any outstanding WAL, wait for it to be replicated to the
3137  * standby, and then exit gracefully.
3138  */
3139  if (!replication_active)
3140  kill(MyProcPid, SIGTERM);
3141  else
3142  got_STOPPING = true;
3143 }
int MyProcPid
Definition: globals.c:43
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:464
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:804

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 377 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

378 {
379  char sysid[32];
380  char xloc[MAXFNAMELEN];
381  XLogRecPtr logptr;
382  char *dbname = NULL;
384  TupOutputState *tstate;
385  TupleDesc tupdesc;
386  Datum values[4];
387  bool nulls[4];
388 
389  /*
390  * Reply with a result set with one row, four columns. First col is system
391  * ID, second is timeline ID, third is current xlog location and the
392  * fourth contains the database name if we are connected to one.
393  */
394 
395  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
397 
400  {
401  /* this also updates ThisTimeLineID */
402  logptr = GetStandbyFlushRecPtr();
403  }
404  else
405  logptr = GetFlushRecPtr();
406 
407  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
408 
409  if (MyDatabaseId != InvalidOid)
410  {
412 
413  /* syscache access needs a transaction env. */
415  /* make dbname live outside TX context */
419  /* CommitTransactionCommand switches to TopMemoryContext */
421  }
422 
424  MemSet(nulls, false, sizeof(nulls));
425 
426  /* need a tuple descriptor representing four columns */
427  tupdesc = CreateTemplateTupleDesc(4);
428  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
429  TEXTOID, -1, 0);
430  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
431  INT4OID, -1, 0);
432  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
433  TEXTOID, -1, 0);
434  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
435  TEXTOID, -1, 0);
436 
437  /* prepare for projection of tuples */
438  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
439 
440  /* column 1: system identifier */
441  values[0] = CStringGetTextDatum(sysid);
442 
443  /* column 2: timeline */
444  values[1] = Int32GetDatum(ThisTimeLineID);
445 
446  /* column 3: wal location */
447  values[2] = CStringGetTextDatum(xloc);
448 
449  /* column 4: database name, or NULL if none */
450  if (dbname)
451  values[3] = CStringGetTextDatum(dbname);
452  else
453  nulls[3] = true;
454 
455  /* send it to dest */
456  do_tup_output(tstate, values, nulls);
457 
458  end_tup_output(tstate);
459 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void CommitTransactionCommand(void)
Definition: xact.c:2959
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8698
bool RecoveryInProgress(void)
Definition: xlog.c:8341
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:195
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2858
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:156
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4999
#define Int32GetDatum(X)
Definition: postgres.h:523
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:3076
#define snprintf
Definition: port.h:217
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:484
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2499 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2500 {
2501  int i;
2502 
2503  /*
2504  * WalSndCtl should be set up already (we inherit this by fork() or
2505  * EXEC_BACKEND mechanism from the postmaster).
2506  */
2507  Assert(WalSndCtl != NULL);
2508  Assert(MyWalSnd == NULL);
2509 
2510  /*
2511  * Find a free walsender slot and reserve it. This must not fail due to
2512  * the prior check for free WAL senders in InitProcess().
2513  */
2514  for (i = 0; i < max_wal_senders; i++)
2515  {
2516  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2517 
2518  SpinLockAcquire(&walsnd->mutex);
2519 
2520  if (walsnd->pid != 0)
2521  {
2522  SpinLockRelease(&walsnd->mutex);
2523  continue;
2524  }
2525  else
2526  {
2527  /*
2528  * Found a free slot. Reserve it for us.
2529  */
2530  walsnd->pid = MyProcPid;
2531  walsnd->state = WALSNDSTATE_STARTUP;
2532  walsnd->sentPtr = InvalidXLogRecPtr;
2533  walsnd->needreload = false;
2534  walsnd->write = InvalidXLogRecPtr;
2535  walsnd->flush = InvalidXLogRecPtr;
2536  walsnd->apply = InvalidXLogRecPtr;
2537  walsnd->writeLag = -1;
2538  walsnd->flushLag = -1;
2539  walsnd->applyLag = -1;
2540  walsnd->sync_standby_priority = 0;
2541  walsnd->latch = &MyProc->procLatch;
2542  walsnd->replyTime = 0;
2543  SpinLockRelease(&walsnd->mutex);
2544  /* don't need the lock anymore */
2545  MyWalSnd = (WalSnd *) walsnd;
2546 
2547  break;
2548  }
2549  }
2550 
2551  Assert(MyWalSnd != NULL);
2552 
2553  /* Arrange to clean up at walsender exit */
2555 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:43
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2559
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:804
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3711 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3712 {
3713  TimestampTz time = 0;
3714 
3715  /* Read all unread samples up to this LSN or end of buffer. */
3716  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3717  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3718  {
3719  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3720  lag_tracker->last_read[head] =
3722  lag_tracker->read_heads[head] =
3724  }
3725 
3726  /*
3727  * If the lag tracker is empty, that means the standby has processed
3728  * everything we've ever sent so we should now clear 'last_read'. If we
3729  * didn't do that, we'd risk using a stale and irrelevant sample for
3730  * interpolation at the beginning of the next burst of WAL after a period
3731  * of idleness.
3732  */
3734  lag_tracker->last_read[head].time = 0;
3735 
3736  if (time > now)
3737  {
3738  /* If the clock somehow went backwards, treat as not found. */
3739  return -1;
3740  }
3741  else if (time == 0)
3742  {
3743  /*
3744  * We didn't cross a time. If there is a future sample that we
3745  * haven't reached yet, and we've already reached at least one sample,
3746  * let's interpolate the local flushed time. This is mainly useful
3747  * for reporting a completely stuck apply position as having
3748  * increasing lag, since otherwise we'd have to wait for it to
3749  * eventually start moving again and cross one of our samples before
3750  * we can show the lag increasing.
3751  */
3753  {
3754  /* There are no future samples, so we can't interpolate. */
3755  return -1;
3756  }
3757  else if (lag_tracker->last_read[head].time != 0)
3758  {
3759  /* We can interpolate between last_read and the next sample. */
3760  double fraction;
3761  WalTimeSample prev = lag_tracker->last_read[head];
3763 
3764  if (lsn < prev.lsn)
3765  {
3766  /*
3767  * Reported LSNs shouldn't normally go backwards, but it's
3768  * possible when there is a timeline change. Treat as not
3769  * found.
3770  */
3771  return -1;
3772  }
3773 
3774  Assert(prev.lsn < next.lsn);
3775 
3776  if (prev.time > next.time)
3777  {
3778  /* If the clock somehow went backwards, treat as not found. */
3779  return -1;
3780  }
3781 
3782  /* See how far we are between the previous and next samples. */
3783  fraction =
3784  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3785 
3786  /* Scale the local flush time proportionally. */
3787  time = (TimestampTz)
3788  ((double) prev.time + (next.time - prev.time) * fraction);
3789  }
3790  else
3791  {
3792  /*
3793  * We have only a future sample, implying that we were entirely
3794  * caught up but and now there is a new burst of WAL and the
3795  * standby hasn't processed the first sample yet. Until the
3796  * standby reaches the future sample the best we can do is report
3797  * the hypothetical lag if that sample were to be replayed now.
3798  */
3799  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3800  }
3801  }
3802 
3803  /* Return the elapsed time since local flush time in microseconds. */
3804  Assert(time != 0);
3805  return now - time;
3806 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:219
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:804
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3646 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3647 {
3648  bool buffer_full;
3649  int new_write_head;
3650  int i;
3651 
3652  if (!am_walsender)
3653  return;
3654 
3655  /*
3656  * If the lsn hasn't advanced since last time, then do nothing. This way
3657  * we only record a new sample when new WAL has been written.
3658  */
3659  if (lag_tracker->last_lsn == lsn)
3660  return;
3661  lag_tracker->last_lsn = lsn;
3662 
3663  /*
3664  * If advancing the write head of the circular buffer would crash into any
3665  * of the read heads, then the buffer is full. In other words, the
3666  * slowest reader (presumably apply) is the one that controls the release
3667  * of space.
3668  */
3669  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3670  buffer_full = false;
3671  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3672  {
3673  if (new_write_head == lag_tracker->read_heads[i])
3674  buffer_full = true;
3675  }
3676 
3677  /*
3678  * If the buffer is full, for now we just rewind by one slot and overwrite
3679  * the last sample, as a simple (if somewhat uneven) way to lower the
3680  * sampling rate. There may be better adaptive compaction algorithms.
3681  */
3682  if (buffer_full)
3683  {
3684  new_write_head = lag_tracker->write_head;
3685  if (lag_tracker->write_head > 0)
3687  else
3689  }
3690 
3691  /* Store a sample at the current write head position. */
3693  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3694  lag_tracker->write_head = new_write_head;
3695 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 905 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

907 {
908  XLogRecPtr flushptr;
909  int count;
910  WALReadError errinfo;
911  XLogSegNo segno;
912 
913  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
915  sendTimeLine = state->currTLI;
917  sendTimeLineNextTLI = state->nextTLI;
918 
919  /* make sure we have enough WAL available */
920  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
921 
922  /* fail if not (implies we are going to shut down) */
923  if (flushptr < targetPagePtr + reqLen)
924  return -1;
925 
926  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
927  count = XLOG_BLCKSZ; /* more than one block available */
928  else
929  count = flushptr - targetPagePtr; /* part of the page available */
930 
931  /* now actually read the data, we know it's there */
932  if (!WALRead(state,
933  cur_page,
934  targetPagePtr,
935  XLOG_BLCKSZ,
936  state->seg.ws_tli, /* Pass the current TLI because only
937  * WalSndSegmentOpen controls whether new
938  * TLI is needed. */
939  &errinfo))
940  WALReadRaiseError(&errinfo);
941 
942  /*
943  * After reading into the buffer, check that what we read was valid. We do
944  * this after reading, because even though the segment was present when we
945  * opened it, it might get recycled or removed while we read it. The
946  * read() succeeds in that case, but the data we tried to read might
947  * already have been overwritten with new WAL records.
948  */
949  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
950  CheckXLogRemoved(segno, state->seg.ws_tli);
951 
952  return count;
953 }
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:975
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:707
WALOpenSegment seg
Definition: xlogreader.h:225
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:4000
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:248
TimeLineID nextTLI
Definition: xlogreader.h:254
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1474
TimeLineID ThisTimeLineID
Definition: xlog.c:195
TimeLineID currTLI
Definition: xlogreader.h:238
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1100
WALSegmentContext segcxt
Definition: xlogreader.h:224
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3373 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3374 {
3375  Interval *result = palloc(sizeof(Interval));
3376 
3377  result->month = 0;
3378  result->day = 0;
3379  result->time = offset;
3380 
3381  return result;
3382 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:1062

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase 
)
static

Definition at line 959 of file walsender.c.

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

963 {
964  ListCell *lc;
965  bool snapshot_action_given = false;
966  bool reserve_wal_given = false;
967  bool two_phase_given = false;
968 
969  /* Parse options */
970  foreach(lc, cmd->options)
971  {
972  DefElem *defel = (DefElem *) lfirst(lc);
973 
974  if (strcmp(defel->defname, "snapshot") == 0)
975  {
976  char *action;
977 
978  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
979  ereport(ERROR,
980  (errcode(ERRCODE_SYNTAX_ERROR),
981  errmsg("conflicting or redundant options")));
982 
983  action = defGetString(defel);
984  snapshot_action_given = true;
985 
986  if (strcmp(action, "export") == 0)
987  *snapshot_action = CRS_EXPORT_SNAPSHOT;
988  else if (strcmp(action, "nothing") == 0)
989  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
990  else if (strcmp(action, "use") == 0)
991  *snapshot_action = CRS_USE_SNAPSHOT;
992  else
993  ereport(ERROR,
994  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
995  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
996  defel->defname, action)));
997 
998  }
999  else if (strcmp(defel->defname, "reserve_wal") == 0)
1000  {
1001  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1002  ereport(ERROR,
1003  (errcode(ERRCODE_SYNTAX_ERROR),
1004  errmsg("conflicting or redundant options")));
1005 
1006  reserve_wal_given = true;
1007  *reserve_wal = defGetBoolean(defel);
1008  }
1009  else if (strcmp(defel->defname, "two_phase") == 0)
1010  {
1011  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1012  ereport(ERROR,
1013  (errcode(ERRCODE_SYNTAX_ERROR),
1014  errmsg("conflicting or redundant options")));
1015  two_phase_given = true;
1016  *two_phase = defGetBoolean(defel);
1017  }
1018  else
1019  elog(ERROR, "unrecognized option: %s", defel->defname);
1020  }
1021 }
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:106
#define ERROR
Definition: elog.h:46
char * defGetString(DefElem *def)
Definition: define.c:49
static bool two_phase
#define ereport(elevel,...)
Definition: elog.h:157
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:758

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3389 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3390 {
3391 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3392  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3393  TupleDesc tupdesc;
3394  Tuplestorestate *tupstore;
3395  MemoryContext per_query_ctx;
3396  MemoryContext oldcontext;
3397  SyncRepStandbyData *sync_standbys;
3398  int num_standbys;
3399  int i;
3400 
3401  /* check to see if caller supports us returning a tuplestore */
3402  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3403  ereport(ERROR,
3404  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3405  errmsg("set-valued function called in context that cannot accept a set")));
3406  if (!(rsinfo->allowedModes & SFRM_Materialize))
3407  ereport(ERROR,
3408  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3409  errmsg("materialize mode required, but it is not allowed in this context")));
3410 
3411  /* Build a tuple descriptor for our result type */
3412  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3413  elog(ERROR, "return type must be a row type");
3414 
3415  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3416  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3417 
3418  tupstore = tuplestore_begin_heap(true, false, work_mem);
3419  rsinfo->returnMode = SFRM_Materialize;
3420  rsinfo->setResult = tupstore;
3421  rsinfo->setDesc = tupdesc;
3422 
3423  MemoryContextSwitchTo(oldcontext);
3424 
3425  /*
3426  * Get the currently active synchronous standbys. This could be out of
3427  * date before we're done, but we'll use the data anyway.
3428  */
3429  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3430 
3431  for (i = 0; i < max_wal_senders; i++)
3432  {
3433  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3435  XLogRecPtr write;
3436  XLogRecPtr flush;
3437  XLogRecPtr apply;
3438  TimeOffset writeLag;
3439  TimeOffset flushLag;
3440  TimeOffset applyLag;
3441  int priority;
3442  int pid;
3444  TimestampTz replyTime;
3445  bool is_sync_standby;
3447  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3448  int j;
3449 
3450  /* Collect data from shared memory */
3451  SpinLockAcquire(&walsnd->mutex);
3452  if (walsnd->pid == 0)
3453  {
3454  SpinLockRelease(&walsnd->mutex);
3455  continue;
3456  }
3457  pid = walsnd->pid;
3458  sentPtr = walsnd->sentPtr;
3459  state = walsnd->state;
3460  write = walsnd->write;
3461  flush = walsnd->flush;
3462  apply = walsnd->apply;
3463  writeLag = walsnd->writeLag;
3464  flushLag = walsnd->flushLag;
3465  applyLag = walsnd->applyLag;
3466  priority = walsnd->sync_standby_priority;
3467  replyTime = walsnd->replyTime;
3468  SpinLockRelease(&walsnd->mutex);
3469 
3470  /*
3471  * Detect whether walsender is/was considered synchronous. We can
3472  * provide some protection against stale data by checking the PID
3473  * along with walsnd_index.
3474  */
3475  is_sync_standby = false;
3476  for (j = 0; j < num_standbys; j++)
3477  {
3478  if (sync_standbys[j].walsnd_index == i &&
3479  sync_standbys[j].pid == pid)
3480  {
3481  is_sync_standby = true;
3482  break;
3483  }
3484  }
3485 
3486  memset(nulls, 0, sizeof(nulls));
3487  values[0] = Int32GetDatum(pid);
3488 
3489  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3490  {
3491  /*
3492  * Only superusers and members of pg_read_all_stats can see
3493  * details. Other users only get the pid value to know it's a
3494  * walsender, but no details.
3495  */
3496  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3497  }
3498  else
3499  {
3500  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3501 
3502  if (XLogRecPtrIsInvalid(sentPtr))
3503  nulls[2] = true;
3504  values[2] = LSNGetDatum(sentPtr);
3505 
3506  if (XLogRecPtrIsInvalid(write))
3507  nulls[3] = true;
3508  values[3] = LSNGetDatum(write);
3509 
3510  if (XLogRecPtrIsInvalid(flush))
3511  nulls[4] = true;
3512  values[4] = LSNGetDatum(flush);
3513 
3514  if (XLogRecPtrIsInvalid(apply))
3515  nulls[5] = true;
3516  values[5] = LSNGetDatum(apply);
3517 
3518  /*
3519  * Treat a standby such as a pg_basebackup background process
3520  * which always returns an invalid flush location, as an
3521  * asynchronous standby.
3522  */
3523  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3524 
3525  if (writeLag < 0)
3526  nulls[6] = true;
3527  else
3528  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3529 
3530  if (flushLag < 0)
3531  nulls[7] = true;
3532  else
3533  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3534 
3535  if (applyLag < 0)
3536  nulls[8] = true;
3537  else
3538  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3539 
3540  values[9] = Int32GetDatum(priority);
3541 
3542  /*
3543  * More easily understood version of standby state. This is purely
3544  * informational.
3545  *
3546  * In quorum-based sync replication, the role of each standby
3547  * listed in synchronous_standby_names can be changing very
3548  * frequently. Any standbys considered as "sync" at one moment can
3549  * be switched to "potential" ones at the next moment. So, it's
3550  * basically useless to report "sync" or "potential" as their sync
3551  * states. We report just "quorum" for them.
3552  */
3553  if (priority == 0)
3554  values[10] = CStringGetTextDatum("async");
3555  else if (is_sync_standby)
3557  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3558  else
3559  values[10] = CStringGetTextDatum("potential");
3560 
3561  if (replyTime == 0)
3562  nulls[11] = true;
3563  else
3564  values[11] = TimestampTzGetDatum(replyTime);
3565  }
3566 
3567  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3568  }
3569 
3570  /* clean up and return the tuplestore */
3571  tuplestore_donestoring(tupstore);
3572 
3573  return (Datum) 0;
3574 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:589
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:495
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:46
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:411
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3354
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:124
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:306
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:308
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:317
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:234
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:311
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:304
#define Int32GetDatum(X)
Definition: postgres.h:523
TupleDesc setDesc
Definition: execnodes.h:312
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:86
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3373
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1961 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1962 {
1963  bool changed = false;
1965 
1966  Assert(lsn != InvalidXLogRecPtr);
1967  SpinLockAcquire(&slot->mutex);
1968  if (slot->data.restart_lsn != lsn)
1969  {
1970  changed = true;
1971  slot->data.restart_lsn = lsn;
1972  }
1973  SpinLockRelease(&slot->mutex);
1974 
1975  if (changed)
1976  {
1979  }
1980 
1981  /*
1982  * One could argue that the slot should be saved to disk now, but that'd
1983  * be energy wasted - the worst lost information can do here is give us
1984  * wrong information in a statistics view - we'll just potentially be more
1985  * conservative in removing files.
1986  */
1987 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:147
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:817
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:73
slock_t mutex
Definition: slot.h:120
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2098 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

2099 {
2100  bool changed = false;
2102 
2103  SpinLockAcquire(&slot->mutex);
2105 
2106  /*
2107  * For physical replication we don't need the interlock provided by xmin
2108  * and effective_xmin since the consequences of a missed increase are
2109  * limited to query cancellations, so set both at once.
2110  */
2111  if (!TransactionIdIsNormal(slot->data.xmin) ||
2112  !TransactionIdIsNormal(feedbackXmin) ||
2113  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2114  {
2115  changed = true;
2116  slot->data.xmin = feedbackXmin;
2117  slot->effective_xmin = feedbackXmin;
2118  }
2119  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2120  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2121  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2122  {
2123  changed = true;
2124  slot->data.catalog_xmin = feedbackCatalogXmin;
2125  slot->effective_catalog_xmin = feedbackCatalogXmin;
2126  }
2127  SpinLockRelease(&slot->mutex);
2128 
2129  if (changed)
2130  {
2133  }
2134 }
PGPROC * MyProc
Definition: proc.c:68
ReplicationSlotPersistentData data
Definition: slot.h:147
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:143
TransactionId xmin
Definition: proc.h:138
TransactionId catalog_xmin
Definition: slot.h:70
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:62
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:120
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:767
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1817 of file walsender.c.

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1818 {
1819  unsigned char firstchar;
1820  int maxmsglen;
1821  int r;
1822  bool received = false;
1823 
1825 
1826  /*
1827  * If we already received a CopyDone from the frontend, any subsequent
1828  * message is the beginning of a new command, and should be processed in
1829  * the main processing loop.
1830  */
1831  while (!streamingDoneReceiving)
1832  {
1833  pq_startmsgread();
1834  r = pq_getbyte_if_available(&firstchar);
1835  if (r < 0)
1836  {
1837  /* unexpected error or EOF */
1839  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1840  errmsg("unexpected EOF on standby connection")));
1841  proc_exit(0);
1842  }
1843  if (r == 0)
1844  {
1845  /* no data available without blocking */
1846  pq_endmsgread();
1847  break;
1848  }
1849 
1850  /* Validate message type and set packet size limit */
1851  switch (firstchar)
1852  {
1853  case 'd':
1854  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1855  break;
1856  case 'c':
1857  case 'X':
1858  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1859  break;
1860  default:
1861  ereport(FATAL,
1862  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1863  errmsg("invalid standby message type \"%c\"",
1864  firstchar)));
1865  maxmsglen = 0; /* keep compiler quiet */
1866  break;
1867  }
1868 
1869  /* Read the message contents */
1871  if (pq_getmessage(&reply_message, maxmsglen))
1872  {
1874  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1875  errmsg("unexpected EOF on standby connection")));
1876  proc_exit(0);
1877  }
1878 
1879  /* ... and process it */
1880  switch (firstchar)
1881  {
1882  /*
1883  * 'd' means a standby reply wrapped in a CopyData packet.
1884  */
1885  case 'd':
1887  received = true;
1888  break;
1889 
1890  /*
1891  * CopyDone means the standby requested to finish streaming.
1892  * Reply with CopyDone, if we had not sent that already.
1893  */
1894  case 'c':
1895  if (!streamingDoneSending)
1896  {
1897  pq_putmessage_noblock('c', NULL, 0);
1898  streamingDoneSending = true;
1899  }
1900 
1901  streamingDoneReceiving = true;
1902  received = true;
1903  break;
1904 
1905  /*
1906  * 'X' means that the standby is closing down the socket.
1907  */
1908  case 'X':
1909  proc_exit(0);
1910 
1911  default:
1912  Assert(false); /* NOT REACHED */
1913  }
1914  }
1915 
1916  /*
1917  * Save the last reply timestamp if we've received at least one reply.
1918  */
1919  if (received)
1920  {
1922  waiting_for_ping_response = false;
1923  }
1924 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1930
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1152
#define FATAL
Definition: elog.h:49
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1214
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1176
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2178 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

2179 {
2180  TransactionId feedbackXmin;
2181  uint32 feedbackEpoch;
2182  TransactionId feedbackCatalogXmin;
2183  uint32 feedbackCatalogEpoch;
2184  TimestampTz replyTime;
2185 
2186  /*
2187  * Decipher the reply message. The caller already consumed the msgtype
2188  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2189  * of this message.
2190  */
2191  replyTime = pq_getmsgint64(&reply_message);
2192  feedbackXmin = pq_getmsgint(&reply_message, 4);
2193  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2194  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2195  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2196 
2198  {
2199  char *replyTimeStr;
2200 
2201  /* Copy because timestamptz_to_str returns a static buffer */
2202  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2203 
2204  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2205  feedbackXmin,
2206  feedbackEpoch,
2207  feedbackCatalogXmin,
2208  feedbackCatalogEpoch,
2209  replyTimeStr);
2210 
2211  pfree(replyTimeStr);
2212  }
2213 
2214  /*
2215  * Update shared state for this WalSender process based on reply data from
2216  * standby.
2217  */
2218  {
2219  WalSnd *walsnd = MyWalSnd;
2220 
2221  SpinLockAcquire(&walsnd->mutex);
2222  walsnd->replyTime = replyTime;
2223  SpinLockRelease(&walsnd->mutex);
2224  }
2225 
2226  /*
2227  * Unset WalSender's xmins if the feedback message values are invalid.
2228  * This happens when the downstream turned hot_standby_feedback off.
2229  */
2230  if (!TransactionIdIsNormal(feedbackXmin)
2231  && !TransactionIdIsNormal(feedbackCatalogXmin))
2232  {
2234  if (MyReplicationSlot != NULL)
2235  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2236  return;
2237  }
2238 
2239  /*
2240  * Check that the provided xmin/epoch are sane, that is, not in the future
2241  * and not so far back as to be already wrapped around. Ignore if not.
2242  */
2243  if (TransactionIdIsNormal(feedbackXmin) &&
2244  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2245  return;
2246 
2247  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2248  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2249  return;
2250 
2251  /*
2252  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2253  * the xmin will be taken into account by GetSnapshotData() /
2254  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2255  * thereby prevent the generation of cleanup conflicts on the standby
2256  * server.
2257  *
2258  * There is a small window for a race condition here: although we just
2259  * checked that feedbackXmin precedes nextXid, the nextXid could have
2260  * gotten advanced between our fetching it and applying the xmin below,
2261  * perhaps far enough to make feedbackXmin wrap around. In that case the
2262  * xmin we set here would be "in the future" and have no effect. No point
2263  * in worrying about this since it's too late to save the desired data
2264  * anyway. Assuming that the standby sends us an increasing sequence of
2265  * xmins, this could only happen during the first reply cycle, else our
2266  * own xmin would prevent nextXid from advancing so far.
2267  *
2268  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2269  * is assumed atomic, and there's no real need to prevent concurrent
2270  * horizon determinations. (If we're moving our xmin forward, this is
2271  * obviously safe, and if we're moving it backwards, well, the data is at
2272  * risk already since a VACUUM could already have determined the horizon.)
2273  *
2274  * If we're using a replication slot we reserve the xmin via that,
2275  * otherwise via the walsender's PGPROC entry. We can only track the
2276  * catalog xmin separately when using a slot, so we store the least of the
2277  * two provided when not using a slot.
2278  *
2279  * XXX: It might make sense to generalize the ephemeral slot concept and
2280  * always use the slot mechanism to handle the feedback xmin.
2281  */
2282  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2283  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2284  else
2285  {
2286  if (TransactionIdIsNormal(feedbackCatalogXmin)
2287  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2288  MyProc->xmin = feedbackCatalogXmin;
2289  else
2290  MyProc->xmin = feedbackXmin;
2291  }
2292 }
uint32 TransactionId
Definition: c.h:587
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
slock_t mutex
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2147
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
TransactionId xmin
Definition: proc.h:138
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:441
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2098
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1930 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1931 {
1932  char msgtype;
1933 
1934  /*
1935  * Check message type from the first byte.
1936  */
1937  msgtype = pq_getmsgbyte(&reply_message);
1938 
1939  switch (msgtype)
1940  {
1941  case 'r':
1943  break;
1944 
1945  case 'h':
1947  break;
1948 
1949  default:
1951  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1952  errmsg("unexpected message type \"%c\"", msgtype)));
1953  proc_exit(0);
1954  }
1955 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1993
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2178

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1993 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1994 {
1995  XLogRecPtr writePtr,
1996  flushPtr,
1997  applyPtr;
1998  bool replyRequested;
1999  TimeOffset writeLag,
2000  flushLag,
2001  applyLag;
2002  bool clearLagTimes;
2003  TimestampTz now;
2004  TimestampTz replyTime;
2005 
2006  static bool fullyAppliedLastTime = false;
2007 
2008  /* the caller already consumed the msgtype byte */
2009  writePtr = pq_getmsgint64(&reply_message);
2010  flushPtr = pq_getmsgint64(&reply_message);
2011  applyPtr = pq_getmsgint64(&reply_message);
2012  replyTime = pq_getmsgint64(&reply_message);
2013  replyRequested = pq_getmsgbyte(&reply_message);
2014 
2016  {
2017  char *replyTimeStr;
2018 
2019  /* Copy because timestamptz_to_str returns a static buffer */
2020  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2021 
2022  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2023  LSN_FORMAT_ARGS(writePtr),
2024  LSN_FORMAT_ARGS(flushPtr),
2025  LSN_FORMAT_ARGS(applyPtr),
2026  replyRequested ? " (reply requested)" : "",
2027  replyTimeStr);
2028 
2029  pfree(replyTimeStr);
2030  }
2031 
2032  /* See if we can compute the round-trip lag for these positions. */
2033  now = GetCurrentTimestamp();
2034  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2035  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2036  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2037 
2038  /*
2039  * If the standby reports that it has fully replayed the WAL in two
2040  * consecutive reply messages, then the second such message must result
2041  * from wal_receiver_status_interval expiring on the standby. This is a
2042  * convenient time to forget the lag times measured when it last
2043  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2044  * until more WAL traffic arrives.
2045  */
2046  clearLagTimes = false;
2047  if (applyPtr == sentPtr)
2048  {
2049  if (fullyAppliedLastTime)
2050  clearLagTimes = true;
2051  fullyAppliedLastTime = true;
2052  }
2053  else
2054  fullyAppliedLastTime = false;
2055 
2056  /* Send a reply if the standby requested one. */
2057  if (replyRequested)
2058  WalSndKeepalive(false);
2059 
2060  /*
2061  * Update shared state for this WalSender process based on reply data from
2062  * standby.
2063  */
2064  {
2065  WalSnd *walsnd = MyWalSnd;
2066 
2067  SpinLockAcquire(&walsnd->mutex);
2068  walsnd->write = writePtr;
2069  walsnd->flush = flushPtr;
2070  walsnd->apply = applyPtr;
2071  if (writeLag != -1 || clearLagTimes)
2072  walsnd->writeLag = writeLag;
2073  if (flushLag != -1 || clearLagTimes)
2074  walsnd->flushLag = flushLag;
2075  if (applyLag != -1 || clearLagTimes)
2076  walsnd->applyLag = applyLag;
2077  walsnd->replyTime = replyTime;
2078  SpinLockRelease(&walsnd->mutex);
2079  }
2080 
2083 
2084  /*
2085  * Advance our local xmin horizon when the client confirmed a flush.
2086  */
2087  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2088  {
2091  else
2093  }
2094 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1169
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3585
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1961
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define SlotIsLogical(slot)
Definition: slot.h:169
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3711
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1706
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 463 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemSet, ReplicationSlot::mutex, ReplicationSlotPersistentData::name, NameStr, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

464 {
465 #define READ_REPLICATION_SLOT_COLS 3
466  ReplicationSlot *slot;
468  TupOutputState *tstate;
469  TupleDesc tupdesc;
471  bool nulls[READ_REPLICATION_SLOT_COLS];
472 
474  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
475  TEXTOID, -1, 0);
476  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
477  TEXTOID, -1, 0);
478  /* TimeLineID is unsigned, so int4 is not wide enough. */
479  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
480  INT8OID, -1, 0);
481 
482  MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
483  MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
484 
485  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486  slot = SearchNamedReplicationSlot(cmd->slotname, false);
487  if (slot == NULL || !slot->in_use)
488  {
489  LWLockRelease(ReplicationSlotControlLock);
490  }
491  else
492  {
493  ReplicationSlot slot_contents;
494  int i = 0;
495 
496  /* Copy slot contents while holding spinlock */
497  SpinLockAcquire(&slot->mutex);
498  slot_contents = *slot;
499  SpinLockRelease(&slot->mutex);
500  LWLockRelease(ReplicationSlotControlLock);
501 
502  if (OidIsValid(slot_contents.data.database))
503  ereport(ERROR,
504  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505  errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
506  "READ_REPLICATION_SLOT",
507  NameStr(slot_contents.data.name)));
508 
509  /* slot type */
510  values[i] = CStringGetTextDatum("physical");
511  nulls[i] = false;
512  i++;
513 
514  /* start LSN */
515  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
516  {
517  char xloc[64];
518 
519  snprintf(xloc, sizeof(xloc), "%X/%X",
520  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521  values[i] = CStringGetTextDatum(xloc);
522  nulls[i] = false;
523  }
524  i++;
525 
526  /* timeline this WAL was produced on */
527  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
528  {
529  TimeLineID slots_position_timeline;
530  TimeLineID current_timeline;
531  List *timeline_history = NIL;
532 
533  /*
534  * While in recovery, use as timeline the currently-replaying one
535  * to get the LSN position's history.
536  */
537  if (RecoveryInProgress())
538  (void) GetXLogReplayRecPtr(&current_timeline);
539  else
540  current_timeline = ThisTimeLineID;
541 
542  timeline_history = readTimeLineHistory(current_timeline);
543  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
544  timeline_history);
545  values[i] = Int64GetDatum((int64) slots_position_timeline);
546  nulls[i] = false;
547  }
548  i++;
549 
551  }
552 
554  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
555  do_tup_output(tstate, values, nulls);
556  end_tup_output(tstate);
557 }
#define NIL
Definition: pg_list.h:65
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:348
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:552
uint32 TimeLineID
Definition: xlogdefs.h:59
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
#define READ_REPLICATION_SLOT_COLS
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
int errcode(int sqlerrcode)
Definition: elog.c:698
#define MemSet(start, val, len)
Definition: c.h:1008
ReplicationSlotPersistentData data
Definition: slot.h:147
bool RecoveryInProgress(void)
Definition: xlog.c:8341
#define OidIsValid(objectId)
Definition: c.h:710
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11942
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1697
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
bool in_use
Definition: slot.h:123
#define SpinLockRelease(lock)
Definition: spin.h:64
uintptr_t Datum
Definition: postgres.h:411
TimeLineID ThisTimeLineID
Definition: xlog.c:195
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:73
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
static Datum values[MAXATTR]
Definition: bootstrap.c:156
int errmsg(const char *fmt,...)
Definition: elog.c:909
int i
#define NameStr(name)
Definition: c.h:681
#define CStringGetTextDatum(s)
Definition: builtins.h:86
slock_t mutex
Definition: slot.h:120
Definition: pg_list.h:50
#define snprintf
Definition: port.h:217
int16 AttrNumber
Definition: attnum.h:21

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 564 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

565 {
567  char histfname[MAXFNAMELEN];
568  char path[MAXPGPATH];
569  int fd;
570  off_t histfilelen;
571  off_t bytesleft;
572  Size len;
573 
574  /*
575  * Reply with a result set with one row, and two columns. The first col is
576  * the name of the history file, 2nd is the contents.
577  */
578 
579  TLHistoryFileName(histfname, cmd->timeline);
580  TLHistoryFilePath(path, cmd->timeline);
581 
582  /* Send a RowDescription message */
583  pq_beginmessage(&buf, 'T');
584  pq_sendint16(&buf, 2); /* 2 fields */
585 
586  /* first field */
587  pq_sendstring(&buf, "filename"); /* col name */
588  pq_sendint32(&buf, 0); /* table oid */
589  pq_sendint16(&buf, 0); /* attnum */
590  pq_sendint32(&buf, TEXTOID); /* type oid */
591  pq_sendint16(&buf, -1); /* typlen */
592  pq_sendint32(&buf, 0); /* typmod */
593  pq_sendint16(&buf, 0); /* format code */
594 
595  /* second field */
596  pq_sendstring(&buf, "content"); /* col name */
597  pq_sendint32(&buf, 0); /* table oid */
598  pq_sendint16(&buf, 0); /* attnum */
599  pq_sendint32(&buf, TEXTOID); /* type oid */
600  pq_sendint16(&buf, -1); /* typlen */
601  pq_sendint32(&buf, 0); /* typmod */
602  pq_sendint16(&buf, 0); /* format code */
603  pq_endmessage(&buf);
604 
605  /* Send a DataRow message */
606  pq_beginmessage(&buf, 'D');
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static void pgstat_report_wait_end(void)
Definition: wait_event.h:274
int errcode(int sqlerrcode)
Definition: elog.c:698
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:108
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1141
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2510
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:258
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:47
int CloseTransientFile(int fd)
Definition: fd.c:2687
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
size_t Size
Definition: c.h:540
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1237 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1238 {
1240  QueryCompletion qc;
1241 
1242  /* make sure that our requirements are still fulfilled */
1244 
1246 
1247  ReplicationSlotAcquire(cmd->slotname, true);
1248 
1250  ereport(ERROR,
1251  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1252  errmsg("cannot read from logical replication slot \"%s\"",
1253  cmd->slotname),
1254  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1255 
1256  /*
1257  * Force a disconnect, so that the decoding code doesn't need to care
1258  * about an eventual switch from running in recovery, to running in a
1259  * normal environment. Client code is expected to handle reconnects.
1260  */
1262  {
1263  ereport(LOG,
1264  (errmsg("terminating walsender process after promotion")));
1265  got_STOPPING = true;
1266  }
1267 
1268  /*
1269  * Create our decoding context, making it start at the previously ack'ed
1270  * position.
1271  *
1272  * Do this before sending a CopyBothResponse message, so that any errors
1273  * are reported early.
1274  */
1276  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1277  XL_ROUTINE(.page_read = logical_read_xlog_page,
1278  .segment_open = WalSndSegmentOpen,
1279  .segment_close = wal_segment_close),
1283 
1285 
1286  /* Send a CopyBothResponse message, and start streaming */
1287  pq_beginmessage(&buf, 'W');
1288  pq_sendbyte(&buf, 0);
1289  pq_sendint16(&buf, 0);
1290  pq_endmessage(&buf);
1291  pq_flush();
1292 
1293  /* Start reading WAL from the oldest required WAL. */
1296 
1297  /*
1298  * Report the location after which we'll send out further commits as the
1299  * current sentPtr.
1300  */
1302 
1303  /* Also update the sent position status in shared memory */
1307 
1308  replication_active = true;
1309 
1311 
1312  /* Main loop of walsender */
1314 
1317 
1318  replication_active = false;
1319  if (got_STOPPING)
1320  proc_exit(0);
1322 
1323  /* Get out of COPY mode (CommandComplete). */
1324  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1325  EndCommand(&qc, DestRemote, false);
1326 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
#define pq_flush()
Definition: libpq.h:46
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationSlotPersistentData data
Definition: slot.h:147
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8341
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:84
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:46
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:478
static char * buf
Definition: pg_test_fsync.c:68
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:243
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1364
void ReplicationSlotRelease(void)
Definition: slot.c:469
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2373
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1337
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2577
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
static void XLogSendLogical(void)
Definition: walsender.c:2956
XLogRecPtr restart_lsn
Definition: slot.h:73
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:41
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:905
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1448

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 670 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

671 {
673  XLogRecPtr FlushPtr;
674 
675  /* create xlogreader for physical replication */
676  xlogreader =
678  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
679  .segment_close = wal_segment_close),
680  NULL);
681 
682  if (!xlogreader)
683  ereport(ERROR,
684  (errcode(ERRCODE_OUT_OF_MEMORY),
685  errmsg("out of memory")));
686 
687  /*
688  * We assume here that we're logging enough information in the WAL for
689  * log-shipping, since this is checked in PostmasterMain().
690  *
691  * NOTE: wal_level can only change at shutdown, so in most cases it is
692  * difficult for there to be WAL data that we can still see that was
693  * written at wal_level='minimal'.
694  */
695 
696  if (cmd->slotname)
697  {
698  ReplicationSlotAcquire(cmd->slotname, true);
700  ereport(ERROR,
701  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
702  errmsg("cannot use a logical replication slot for physical replication")));
703 
704  /*
705  * We don't need to verify the slot's restart_lsn here; instead we
706  * rely on the caller requesting the starting point to use. If the
707  * WAL segment doesn't exist, we'll fail later.
708  */
709  }
710 
711  /*
712  * Select the timeline. If it was given explicitly by the client, use
713  * that. Otherwise use the timeline of the last replayed record, which is
714  * kept in ThisTimeLineID.
715  */
718  {
719  /* this also updates ThisTimeLineID */
720  FlushPtr = GetStandbyFlushRecPtr();
721  }
722  else
723  FlushPtr = GetFlushRecPtr();
724 
725  if (cmd->timeline != 0)
726  {
727  XLogRecPtr switchpoint;
728 
729  sendTimeLine = cmd->timeline;
731  {
732  sendTimeLineIsHistoric = false;
734  }
735  else
736  {
737  List *timeLineHistory;
738 
739  sendTimeLineIsHistoric = true;
740 
741  /*
742  * Check that the timeline the client requested exists, and the
743  * requested start location is on that timeline.
744  */
745  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
746  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
748  list_free_deep(timeLineHistory);
749 
750  /*
751  * Found the requested timeline in the history. Check that
752  * requested startpoint is on that timeline in our history.
753  *
754  * This is quite loose on purpose. We only check that we didn't
755  * fork off the requested timeline before the switchpoint. We
756  * don't check that we switched *to* it before the requested
757  * starting point. This is because the client can legitimately
758  * request to start replication from the beginning of the WAL
759  * segment that contains switchpoint, but on the new timeline, so
760  * that it doesn't end up with a partial segment. If you ask for
761  * too old a starting point, you'll get an error later when we
762  * fail to find the requested WAL segment in pg_wal.
763  *
764  * XXX: we could be more strict here and only allow a startpoint
765  * that's older than the switchpoint, if it's still in the same
766  * WAL segment.
767  */
768  if (!XLogRecPtrIsInvalid(switchpoint) &&
769  switchpoint < cmd->startpoint)
770  {
771  ereport(ERROR,
772  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
774  cmd->timeline),
775  errdetail("This server's history forked from timeline %u at %X/%X.",
776  cmd->timeline,
777  LSN_FORMAT_ARGS(switchpoint))));
778  }
779  sendTimeLineValidUpto = switchpoint;
780  }
781  }
782  else
783  {
786  sendTimeLineIsHistoric = false;
787  }
788 
790 
791  /* If there is nothing to stream, don't even enter COPY mode */
793  {
794  /*
795  * When we first start replication the standby will be behind the
796  * primary. For some applications, for example synchronous
797  * replication, it is important to have a clear state for this initial
798  * catchup mode, so we can trigger actions when we change streaming
799  * state later. We may stay in this state for a long time, which is
800  * exactly why we want to be able to monitor whether or not we are
801  * still here.
802  */
804 
805  /* Send a CopyBothResponse message, and start streaming */
806  pq_beginmessage(&buf, 'W');
807  pq_sendbyte(&buf, 0);
808  pq_sendint16(&buf, 0);
809  pq_endmessage(&buf);
810  pq_flush();
811 
812  /*
813  * Don't allow a request to stream from a future point in WAL that
814  * hasn't been flushed to disk in this server yet.
815  */
816  if (FlushPtr < cmd->startpoint)
817  {
818  ereport(ERROR,
819  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
821  LSN_FORMAT_ARGS(FlushPtr))));
822  }
823 
824  /* Start streaming from the requested point */
825  sentPtr = cmd->startpoint;
826 
827  /* Initialize shared memory status, too */
831 
833 
834  /* Main loop of walsender */
835  replication_active = true;
836 
838 
839  replication_active = false;
840  if (got_STOPPING)
841  proc_exit(0);
843 
845  }
846 
847  if (cmd->slotname)
849 
850  /*
851  * Copy is finished now. Send a single-row result set indicating the next
852  * timeline.
853  */
855  {
856  char startpos_str[8 + 1 + 8 + 1];
858  TupOutputState *tstate;
859  TupleDesc tupdesc;
860  Datum values[2];
861  bool nulls[2];
862 
863  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
865 
867  MemSet(nulls, false, sizeof(nulls));
868 
869  /*
870  * Need a tuple descriptor representing two columns. int8 may seem
871  * like a surprising data type for this, but in theory int4 would not
872  * be wide enough for this, as TimeLineID is unsigned.
873  */
874  tupdesc = CreateTemplateTupleDesc(2);
875  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
876  INT8OID, -1, 0);
877  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
878  TEXTOID, -1, 0);
879 
880  /* prepare for projection of tuple */
881  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
882 
883  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
884  values[1] = CStringGetTextDatum(startpos_str);
885 
886  /* send it to dest */
887  do_tup_output(tstate, values, nulls);
888 
889  end_tup_output(tstate);
890  }
891 
892  /* Send CommandComplete message */
893  EndReplicationCommand("START_STREAMING");
894 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
#define pq_flush()
Definition: libpq.h:46
int wal_segment_size
Definition: xlog.c:120
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2655
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8698
bool RecoveryInProgress(void)
Definition: xlog.c:8341
void list_free_deep(List *list)
Definition: list.c:1405
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void ReplicationSlotRelease(void)
Definition: slot.c:469
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:78
#define SlotIsLogical(slot)
Definition: slot.h:169
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1697
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2373
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:411
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2577
TimeLineID ThisTimeLineID
Definition: xlog.c:195
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:156
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:3076
Definition: pg_list.h:50
#define snprintf
Definition: port.h:217
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2147 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2148 {
2149  FullTransactionId nextFullXid;
2150  TransactionId nextXid;
2151  uint32 nextEpoch;
2152 
2153  nextFullXid = ReadNextFullTransactionId();
2154  nextXid = XidFromFullTransactionId(nextFullXid);
2155  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2156 
2157  if (xid <= nextXid)
2158  {
2159  if (epoch != nextEpoch)
2160  return false;
2161  }
2162  else
2163  {
2164  if (epoch + 1 != nextEpoch)
2165  return false;
2166  }
2167 
2168  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2169  return false; /* epoch OK, but it's wrapped around */
2170 
2171  return true;
2172 }
uint32 TransactionId
Definition: c.h:587
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2346 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2347 {
2348  TimestampTz timeout;
2349 
2350  /* don't bail out if we're doing something that doesn't require timeouts */
2351  if (last_reply_timestamp <= 0)
2352  return;
2353 
2356 
2357  if (wal_sender_timeout > 0 && last_processing >= timeout)
2358  {
2359  /*
2360  * Since typically expiration of replication timeout means
2361  * communication problem, we don't send the error message to the
2362  * standby.
2363  */
2365  (errmsg("terminating walsender process due to replication timeout")));
2366 
2367  WalSndShutdown();
2368  }
2369 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2302 of file walsender.c.

References last_reply_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2303 {
2304  long sleeptime = 10000; /* 10 s */
2305 
2307  {
2308  TimestampTz wakeup_time;
2309 
2310  /*
2311  * At the latest stop sleeping once wal_sender_timeout has been
2312  * reached.
2313  */
2316 
2317  /*
2318  * If no ping has been sent yet, wakeup when it's time to do so.
2319  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2320  * the timeout passed without a response.
2321  */
2324  wal_sender_timeout / 2);
2325 
2326  /* Compute relative time until wakeup. */
2327  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2328  }
2329 
2330  return sleeptime;
2331 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3036 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

3037 {
3038  XLogRecPtr replicatedPtr;
3039 
3040  /* ... let's just be real sure we're caught up ... */
3041  send_data();
3042 
3043  /*
3044  * To figure out whether all WAL has successfully been replicated, check
3045  * flush location if valid, write otherwise. Tools like pg_receivewal will
3046  * usually (unless in synchronous mode) return an invalid flush location.
3047  */
3048  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3050 
3051  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3052  !pq_is_send_pending())
3053  {
3054  QueryCompletion qc;
3055 
3056  /* Inform the standby that XLOG streaming is done */
3057  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3058  EndCommand(&qc, DestRemote, false);
3059  pq_flush();
3060 
3061  proc_exit(0);
3062  }
3064  WalSndKeepalive(true);
3065 }
#define pq_is_send_pending()
Definition: libpq.h:48
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:46
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3585
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 297 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

298 {
302 
303  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
305 
306  if (MyReplicationSlot != NULL)
308 
310 
311  replication_active = false;
312 
313  /*
314  * If there is a transaction in progress, it will clean up our
315  * ResourceOwner, but if a replication command set up a resource owner
316  * without a transaction, we've got to clean that up now.
317  */
319  WalSndResourceCleanup(false);
320 
321  if (got_STOPPING || got_SIGUSR2)
322  proc_exit(0);
323 
324  /* Revert back to startup state */
326 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
static void pgstat_report_wait_end(void)
Definition: wait_event.h:274
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4721
WALOpenSegment seg
Definition: xlogreader.h:225
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:469
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:332
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
void ReplicationSlotCleanup(void)
Definition: slot.c:525
void LWLockReleaseAll(void)
Definition: lwlock.c:1902

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3354 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3355 {
3356  switch (state)
3357  {
3358  case WALSNDSTATE_STARTUP:
3359  return "startup";
3360  case WALSNDSTATE_BACKUP:
3361  return "backup";
3362  case WALSNDSTATE_CATCHUP:
3363  return "catchup";
3364  case WALSNDSTATE_STREAMING:
3365  return "streaming";
3366  case WALSNDSTATE_STOPPING:
3367  return "stopping";
3368  }
3369  return "UNKNOWN";
3370 }
Definition: regguts.h:317

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3271 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3272 {
3273  int i;
3274 
3275  for (i = 0; i < max_wal_senders; i++)
3276  {
3277  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3278  pid_t pid;
3279 
3280  SpinLockAcquire(&walsnd->mutex);
3281  pid = walsnd->pid;
3282  SpinLockRelease(&walsnd->mutex);
3283 
3284  if (pid == 0)
3285  continue;
3286 
3288  }
3289 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3585 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, and waiting_for_ping_response.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3586 {
3587  elog(DEBUG2, "sending replication keepalive");
3588 
3589  /* construct the message... */
3591  pq_sendbyte(&output_message, 'k');
3594  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3595 
3596  /* ... and send it wrapped in CopyData */
3598 
3599  /* Set local flag */
3600  if (requestReply)
3602 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define elog(elevel,...)
Definition: elog.h:232
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3608 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3609 {
3610  TimestampTz ping_time;
3611 
3612  /*
3613  * Don't send keepalive messages if timeouts are globally disabled or
3614  * we're doing something not partaking in timeouts.
3615  */
3616  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3617  return;
3618 
3620  return;
3621 
3622  /*
3623  * If half of wal_sender_timeout has lapsed without receiving any reply
3624  * from the standby, send a keep-alive message to the standby requesting
3625  * an immediate reply.
3626  */
3628  wal_sender_timeout / 2);
3629  if (last_processing >= ping_time)
3630  {
3631  WalSndKeepalive(true);
3632 
3633  /* Try to flush pending output to the client */
3634  if (pq_flush_if_writable() != 0)
3635  WalSndShutdown();
3636  }
3637 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3585
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:47
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2559 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2560 {
2561  WalSnd *walsnd = MyWalSnd;
2562 
2563  Assert(walsnd != NULL);
2564 
2565  MyWalSnd = NULL;
2566 
2567  SpinLockAcquire(&walsnd->mutex);
2568  /* clear latch while holding the spinlock, so it can safely be read */
2569  walsnd->latch = NULL;
2570  /* Mark WalSnd struct as no longer being in use. */
2571  walsnd->pid = 0;
2572  SpinLockRelease(&walsnd->mutex);
2573 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3151 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3152 {
3153  int save_errno = errno;
3154 
3155  got_SIGUSR2 = true;
3156  SetLatch(MyLatch);
3157 
3158  errno = save_errno;
3159 }
void SetLatch(Latch *latch)
Definition: latch.c:567
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:57

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2373 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2374 {
2375  /*
2376  * Initialize the last reply timestamp. That enables timeout processing
2377  * from hereon.
2378  */
2380  waiting_for_ping_response = false;
2381 
2382  /*
2383  * Loop until we reach the end of this timeline or the client requests to
2384  * stop streaming.
2385  */
2386  for (;;)
2387  {
2388  /* Clear any already-pending wakeups */
2390 
2392 
2393  /* Process any requests or signals received recently */
2394  if (ConfigReloadPending)
2395  {
2396  ConfigReloadPending = false;
2399  }
2400 
2401  /* Check for input from the client */
2403 
2404  /*
2405  * If we have received CopyDone from the client, sent CopyDone
2406  * ourselves, and the output buffer is empty, it's time to exit
2407  * streaming.
2408  */
2410  !pq_is_send_pending())
2411  break;
2412 
2413  /*
2414  * If we don't have any pending data in the output buffer, try to send
2415  * some more. If there is some, we don't bother to call send_data
2416  * again until we've flushed it ... but we'd better assume we are not
2417  * caught up.
2418  */
2419  if (!pq_is_send_pending())
2420  send_data();
2421  else
2422  WalSndCaughtUp = false;
2423 
2424  /* Try to flush pending output to the client */
2425  if (pq_flush_if_writable() != 0)
2426  WalSndShutdown();
2427 
2428  /* If nothing remains to be sent right now ... */
2430  {
2431  /*
2432  * If we're in catchup state, move to streaming. This is an
2433  * important state change for users to know about, since before
2434  * this point data loss might occur if the primary dies and we
2435  * need to failover to the standby. The state change is also
2436  * important for synchronous replication, since commits that
2437  * started to wait at that point might wait for some time.
2438  */
2440  {
2441  ereport(DEBUG1,
2442  (errmsg_internal("\"%s\" has now caught up with upstream server",
2443  application_name)));
2445  }
2446 
2447  /*
2448  * When SIGUSR2 arrives, we send any outstanding logs up to the
2449  * shutdown checkpoint record (i.e., the latest record), wait for
2450  * them to be replicated to the standby, and exit. This may be a
2451  * normal termination at shutdown, or a promotion, the walsender
2452  * is not sure which.
2453  */
2454  if (got_SIGUSR2)
2455  WalSndDone(send_data);
2456  }
2457 
2458  /* Check for replication timeout. */
2460 
2461  /* Send keepalive if the time has come */
2463 
2464  /*
2465  * Block if we have unsent data. XXX For logical replication, let
2466  * WalSndWaitForWal() handle any other blocking; idle receivers need
2467  * its additional actions. For physical replication, also block if
2468  * caught up; its send_data does not block.
2469  */
2470  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2473  {
2474  long sleeptime;
2475  int wakeEvents;
2476 
2478  wakeEvents = WL_SOCKET_READABLE;
2479  else
2480  wakeEvents = 0;
2481 
2482  /*
2483  * Use fresh timestamp, not last_processing, to reduce the chance
2484  * of reaching wal_sender_timeout before sending a keepalive.
2485  */
2487 
2488  if (pq_is_send_pending())
2489  wakeEvents |= WL_SOCKET_WRITEABLE;
2490 
2491  /* Sleep until something happens or we time out */
2492  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2493  }
2494  }
2495 }
#define pq_is_send_pending()
Definition: libpq.h:48
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3254
#define DEBUG1
Definition: elog.h:25
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3036
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ResetLatch(Latch *latch)
Definition: latch.c:660
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:47
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:411
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3608
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2346
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2302
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
static void XLogSendLogical(void)
Definition: walsender.c:2956
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:622
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1817

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1337 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1338 {
1339  /* can't have sync rep confused by sending the same LSN several times */
1340  if (!last_write)
1341  lsn = InvalidXLogRecPtr;
1342 
1343  resetStringInfo(ctx->out);
1344 
1345  pq_sendbyte(ctx->out, 'w');
1346  pq_sendint64(ctx->out, lsn); /* dataStart */
1347  pq_sendint64(ctx->out, lsn); /* walEnd */
1348 
1349  /*
1350  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1351  * reserve space here.
1352  */
1353  pq_sendint64(ctx->out, 0); /* sendtime */
1354 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 332 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

333 {
334  ResourceOwner resowner;
335 
336  if (CurrentResourceOwner == NULL)
337  return;
338 
339  /*
340  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
341  * in a local variable and clear it first.
342  */
343  resowner = CurrentResourceOwner;
344  CurrentResourceOwner = NULL;
345 
346  /* Now we can release resources and delete it. */
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_LOCKS, isCommit, true);
351  ResourceOwnerRelease(resowner,
352  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
353  ResourceOwnerDelete(resowner);
354 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3106 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

3107 {
3108  int i;
3109 
3110  for (i = 0; i < max_wal_senders; i++)
3111  {
3112  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3113 
3114  SpinLockAcquire(&walsnd->mutex);
3115  if (walsnd->pid == 0)
3116  {
3117  SpinLockRelease(&walsnd->mutex);
3118  continue;
3119  }
3120  walsnd->needreload = true;
3121  SpinLockRelease(&walsnd->mutex);
3122  }
3123 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2577 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

2579 {
2580  char path[MAXPGPATH];
2581 
2582  /*-------
2583  * When reading from a historic timeline, and there is a timeline switch
2584  * within this segment, read from the WAL segment belonging to the new
2585  * timeline.
2586  *
2587  * For example, imagine that this server is currently on timeline 5, and
2588  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2589  * 0/13002088. In pg_wal, we have these files:
2590  *
2591  * ...
2592  * 000000040000000000000012
2593  * 000000040000000000000013
2594  * 000000050000000000000013
2595  * 000000050000000000000014
2596  * ...
2597  *
2598  * In this situation, when requested to send the WAL from segment 0x13, on
2599  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2600  * recovery prefers files from newer timelines, so if the segment was
2601  * restored from the archive on this server, the file belonging to the old
2602  * timeline, 000000040000000000000013, might not exist. Their contents are
2603  * equal up to the switchpoint, because at a timeline switch, the used
2604  * portion of the old segment is copied to the new file. -------
2605  */
2606  *tli_p = sendTimeLine;
2608  {
2609  XLogSegNo endSegNo;
2610 
2612  if (nextSegNo == endSegNo)
2613  *tli_p = sendTimeLineNextTLI;
2614  }
2615 
2616  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2617  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2618  if (state->seg.ws_file >= 0)
2619  return;
2620 
2621  /*
2622  * If the file is not found, assume it's because the standby asked for a
2623  * too old WAL segment that has already been removed or recycled.
2624  */
2625  if (errno == ENOENT)
2626  {
2627  char xlogfname[MAXFNAMELEN];
2628  int save_errno = errno;
2629 
2630  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2631  errno = save_errno;
2632  ereport(ERROR,
2634  errmsg("requested WAL segment %s has already been removed",
2635  xlogfname)));
2636  }
2637  else
2638  ereport(ERROR,
2640  errmsg("could not open file \"%s\": %m",
2641  path)));
2642 }
int wal_segment_size
Definition: xlog.c:120
#define PG_BINARY
Definition: c.h:1271
WALOpenSegment seg
Definition: xlogreader.h:225
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:721
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1070
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:909
WALSegmentContext segcxt
Definition: xlogreader.h:224
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3335 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3336 {
3337  WalSnd *walsnd = MyWalSnd;
3338 
3340 
3341  if (walsnd->state == state)
3342  return;
3343 
3344  SpinLockAcquire(&walsnd->mutex);
3345  walsnd->state = state;
3346  SpinLockRelease(&walsnd->mutex);
3347 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3194 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3195 {
3196  bool found;
3197  int i;
3198 
3199  WalSndCtl = (WalSndCtlData *)
3200  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3201 
3202  if (!found)
3203  {
3204  /* First time through, so initialize */
3206 
3207  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3209 
3210  for (i = 0; i < max_wal_senders; i++)
3211  {
3212  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3213 
3214  SpinLockInit(&walsnd->mutex);
3215  }
3216  }
3217 }
Size WalSndShmemSize(void)
Definition: walsender.c:3182
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3182 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

3183 {
3184  Size size = 0;
3185 
3186  size = offsetof(WalSndCtlData, walsnds);
3187  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3188 
3189  return size;
3190 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

264 {
266 
267  /* Create a per-walsender data structure in shared memory */
269 
270  /*
271  * We don't currently need any ResourceOwner in a walsender process, but
272  * if we did, we could call CreateAuxProcessResourceOwner here.
273  */
274 
275  /*
276  * Let postmaster know that we're a WAL sender. Once we've declared us as
277  * a WAL sender process, postmaster will let us outlive the bgwriter and
278  * kill us last in the shutdown sequence, so we get a chance to stream all
279  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
280  * there's no going back, and we mustn't write any WAL records after this.
281  */
284 
285  /* Initialize empty timestamp buffer for lag tracking. */
287 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2499
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
bool RecoveryInProgress(void)
Definition: xlog.c:8341
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3163 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3164 {
3165  /* Set up signal handlers */
3167  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3168  pqsignal(SIGTERM, die); /* request shutdown */
3169  /* SIGQUIT handler was already set up by InitPostmasterChild */
3170  InitializeTimeouts(); /* establishes SIGALRM handler */
3173  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3174  * shutdown */
3175 
3176  /* Reset some signals that are accepted by postmaster but not here */
3178 }
void InitializeTimeouts(void)
Definition: timeout.c:461
#define SIGUSR1
Definition: win32_port.h:179
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3151
#define SIGCHLD
Definition: win32_port.h:177
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:172
#define SIGUSR2
Definition: win32_port.h:180
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
#define SIGHUP
Definition: win32_port.h:167
#define SIG_IGN
Definition: win32_port.h:164
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:162
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1448 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1449 {
1450  static TimestampTz sendTime = 0;
1452 
1453  /*
1454  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1455  * avoid flooding the lag tracker when we commit frequently.
1456  */
1457 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1458  if (!TimestampDifferenceExceeds(sendTime, now,
1460  return;
1461 
1462  LagTrackerWrite(lsn, now);
1463  sendTime = now;
1464 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3646
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3254 of file walsender.c.

References WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, ModifyWaitEvent(), proc_exit(), WaitEventSetWait(), and WL_POSTMASTER_DEATH.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3255 {
3256  WaitEvent event;
3257 
3258  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3259  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3260  (event.events & WL_POSTMASTER_DEATH))
3261  proc_exit(1);
3262 }
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
void proc_exit(int code)
Definition: ipc.c:104
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:948
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
uint32 events
Definition: latch.h:145
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1306

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1474 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1475 {
1476  int wakeEvents;
1477  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1478 
1479  /*
1480  * Fast path to avoid acquiring the spinlock in case we already know we
1481  * have enough WAL available. This is particularly interesting if we're
1482  * far behind.
1483  */
1484  if (RecentFlushPtr != InvalidXLogRecPtr &&
1485  loc <= RecentFlushPtr)
1486  return RecentFlushPtr;
1487 
1488  /* Get a more recent flush pointer. */
1489  if (!RecoveryInProgress())
1490  RecentFlushPtr = GetFlushRecPtr();
1491  else
1492  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1493 
1494  for (;;)
1495  {
1496  long sleeptime;
1497 
1498  /* Clear any already-pending wakeups */
1500 
1502 
1503  /* Process any requests or signals received recently */
1504  if (ConfigReloadPending)
1505  {
1506  ConfigReloadPending = false;
1509  }
1510 
1511  /* Check for input from the client */
1513 
1514  /*
1515  * If we're shutting down, trigger pending WAL to be written out,
1516  * otherwise we'd possibly end up waiting for WAL that never gets
1517  * written, because walwriter has shut down already.
1518  */
1519  if (got_STOPPING)
1521 
1522  /* Update our idea of the currently flushed position. */
1523  if (!RecoveryInProgress())
1524  RecentFlushPtr = GetFlushRecPtr();
1525  else
1526  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1527 
1528  /*
1529  * If postmaster asked us to stop, don't wait anymore.
1530  *
1531  * It's important to do this check after the recomputation of
1532  * RecentFlushPtr, so we can send all remaining data before shutting
1533  * down.
1534  */
1535  if (got_STOPPING)
1536  break;
1537 
1538  /*
1539  * We only send regular messages to the client for full decoded
1540  * transactions, but a synchronous replication and walsender shutdown
1541  * possibly are waiting for a later location. So, before sleeping, we
1542  * send a ping containing the flush location. If the receiver is
1543  * otherwise idle, this keepalive will trigger a reply. Processing the
1544  * reply will update these MyWalSnd locations.
1545  */
1546  if (MyWalSnd->flush < sentPtr &&
1547  MyWalSnd->write < sentPtr &&
1549  WalSndKeepalive(false);
1550 
1551  /* check whether we're done */
1552  if (loc <= RecentFlushPtr)
1553  break;
1554 
1555  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1556  WalSndCaughtUp = true;
1557 
1558  /*
1559  * Try to flush any pending output to the client.
1560  */
1561  if (pq_flush_if_writable() != 0)
1562  WalSndShutdown();
1563 
1564  /*
1565  * If we have received CopyDone from the client, sent CopyDone
1566  * ourselves, and the output buffer is empty, it's time to exit
1567  * streaming, so fail the current WAL fetch request.
1568  */
1570  !pq_is_send_pending())
1571  break;
1572 
1573  /* die if timeout was reached */
1575 
1576  /* Send keepalive if the time has come */
1578 
1579  /*
1580  * Sleep until something happens or we time out. Also wait for the
1581  * socket becoming writable, if there's still pending output.
1582  * Otherwise we might sit on sendable output data while waiting for
1583  * new WAL to be generated. (But if we have nothing to send, we don't
1584  * want to wake on socket-writable.)
1585  */
1587 
1588  wakeEvents = WL_SOCKET_READABLE;
1589 
1590  if (pq_is_send_pending())
1591  wakeEvents |= WL_SOCKET_WRITEABLE;
1592 
1593  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1594  }
1595 
1596  /* reactivate latch so WalSndLoop knows to continue */
1597  SetLatch(MyLatch);
1598  return RecentFlushPtr;
1599 }
#define pq_is_send_pending()
Definition: libpq.h:48
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3254
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:126
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8698
bool RecoveryInProgress(void)
Definition: xlog.c:8341
void SetLatch(Latch *latch)
Definition: latch.c:567
void ResetLatch(Latch *latch)
Definition: latch.c:660
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3585
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11942
bool XLogBackgroundFlush(void)
Definition: xlog.c:3077
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:47
void SyncRepInitConfig(void)
Definition: syncrep.c:411
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3608
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2346
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2302
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static bool waiting_for_ping_response
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1817

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3297 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3298 {
3299  for (;;)
3300  {
3301  int i;
3302  bool all_stopped = true;
3303 
3304  for (i = 0; i < max_wal_senders; i++)
3305  {
3306  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3307 
3308  SpinLockAcquire(&walsnd->mutex);
3309 
3310  if (walsnd->pid == 0)
3311  {
3312  SpinLockRelease(&walsnd->mutex);
3313  continue;
3314  }
3315 
3316  if (walsnd->state != WALSNDSTATE_STOPPING)
3317  {
3318  all_stopped = false;
3319  SpinLockRelease(&walsnd->mutex);
3320  break;
3321  }
3322  SpinLockRelease(&walsnd->mutex);
3323  }
3324 
3325  /* safe to leave if confirmation is done for all WAL senders */
3326  if (all_stopped)
3327  return;
3328 
3329  pg_usleep(10000L); /* wait for 10 msec */
3330  }
3331 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3226 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3227 {
3228  int i;
3229 
3230  for (i = 0; i < max_wal_senders; i++)
3231  {
3232  Latch *latch;
3233  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3234 
3235  /*
3236  * Get latch pointer with spinlock held, for the unlikely case that
3237  * pointer reads aren't atomic (as they're 8 bytes).
3238  */
3239  SpinLockAcquire(&walsnd->mutex);
3240  latch = walsnd->latch;
3241  SpinLockRelease(&walsnd->mutex);
3242 
3243  if (latch != NULL)
3244  SetLatch(latch);
3245  }
3246 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:567
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static