PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 921 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

922 {
923  const char *snapshot_name = NULL;
924  char xloc[MAXFNAMELEN];
925  char *slot_name;
926  bool reserve_wal = false;
927  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
929  TupOutputState *tstate;
930  TupleDesc tupdesc;
931  Datum values[4];
932  bool nulls[4];
933 
935 
936  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
937 
938  /* setup state for WalSndSegmentOpen */
939  sendTimeLineIsHistoric = false;
941 
942  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
943  {
944  ReplicationSlotCreate(cmd->slotname, false,
946  }
947  else
948  {
950 
951  /*
952  * Initially create persistent slot as ephemeral - that allows us to
953  * nicely handle errors during initialization because it'll get
954  * dropped if this transaction fails. We'll make it persistent at the
955  * end. Temporary slots can be created as temporary from beginning as
956  * they get dropped on error as well.
957  */
958  ReplicationSlotCreate(cmd->slotname, true,
960  }
961 
962  if (cmd->kind == REPLICATION_KIND_LOGICAL)
963  {
965  bool need_full_snapshot = false;
966 
967  /*
968  * Do options check early so that we can bail before calling the
969  * DecodingContextFindStartpoint which can take long time.
970  */
971  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
972  {
973  if (IsTransactionBlock())
974  ereport(ERROR,
975  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
976  (errmsg("%s must not be called inside a transaction",
977  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
978 
979  need_full_snapshot = true;
980  }
981  else if (snapshot_action == CRS_USE_SNAPSHOT)
982  {
983  if (!IsTransactionBlock())
984  ereport(ERROR,
985  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
986  (errmsg("%s must be called inside a transaction",
987  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
988 
990  ereport(ERROR,
991  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
993  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
994 
995  if (FirstSnapshotSet)
996  ereport(ERROR,
997  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
998  (errmsg("%s must be called before any query",
999  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1000 
1001  if (IsSubTransaction())
1002  ereport(ERROR,
1003  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1004  (errmsg("%s must not be called in a subtransaction",
1005  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1006 
1007  need_full_snapshot = true;
1008  }
1009 
1010  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1012  XL_ROUTINE(.page_read = logical_read_xlog_page,
1013  .segment_open = WalSndSegmentOpen,
1014  .segment_close = wal_segment_close),
1017 
1018  /*
1019  * Signal that we don't need the timeout mechanism. We're just
1020  * creating the replication slot and don't yet accept feedback
1021  * messages or send keepalives. As we possibly need to wait for
1022  * further WAL the walsender would otherwise possibly be killed too
1023  * soon.
1024  */
1026 
1027  /* build initial snapshot, might take a while */
1029 
1030  /*
1031  * Export or use the snapshot if we've been asked to do so.
1032  *
1033  * NB. We will convert the snapbuild.c kind of snapshot to normal
1034  * snapshot when doing this.
1035  */
1036  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1037  {
1038  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1039  }
1040  else if (snapshot_action == CRS_USE_SNAPSHOT)
1041  {
1042  Snapshot snap;
1043 
1046  }
1047 
1048  /* don't need the decoding context anymore */
1049  FreeDecodingContext(ctx);
1050 
1051  if (!cmd->temporary)
1053  }
1054  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1055  {
1057 
1059 
1060  /* Write this slot to disk if it's a permanent one. */
1061  if (!cmd->temporary)
1063  }
1064 
1065  snprintf(xloc, sizeof(xloc), "%X/%X",
1068 
1070  MemSet(nulls, false, sizeof(nulls));
1071 
1072  /*----------
1073  * Need a tuple descriptor representing four columns:
1074  * - first field: the slot name
1075  * - second field: LSN at which we became consistent
1076  * - third field: exported snapshot's name
1077  * - fourth field: output plugin
1078  *----------
1079  */
1080  tupdesc = CreateTemplateTupleDesc(4);
1081  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1082  TEXTOID, -1, 0);
1083  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1084  TEXTOID, -1, 0);
1085  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1086  TEXTOID, -1, 0);
1087  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1088  TEXTOID, -1, 0);
1089 
1090  /* prepare for projection of tuples */
1091  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1092 
1093  /* slot_name */
1094  slot_name = NameStr(MyReplicationSlot->data.name);
1095  values[0] = CStringGetTextDatum(slot_name);
1096 
1097  /* consistent wal location */
1098  values[1] = CStringGetTextDatum(xloc);
1099 
1100  /* snapshot name, or NULL if none */
1101  if (snapshot_name != NULL)
1102  values[2] = CStringGetTextDatum(snapshot_name);
1103  else
1104  nulls[2] = true;
1105 
1106  /* plugin, or NULL if none */
1107  if (cmd->plugin != NULL)
1108  values[3] = CStringGetTextDatum(cmd->plugin);
1109  else
1110  nulls[3] = true;
1111 
1112  /* send it to dest */
1113  do_tup_output(tstate, values, nulls);
1114  end_tup_output(tstate);
1115 
1117 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:868
PGPROC * MyProc
Definition: proc.c:67
#define XACT_REPEATABLE_READ
Definition: xact.h:38
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:950
void ReplicationSlotSave(void)
Definition: slot.c:719
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2222
ReplicationSlotPersistentData data
Definition: slot.h:143
XLogRecPtr confirmed_flush
Definition: slot.h:91
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4684
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:632
void ReplicationSlotReserveWal(void)
Definition: slot.c:1078
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:527
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
bool FirstSnapshotSet
Definition: snapmgr.c:149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:754
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1260
unsigned int uint32
Definition: c.h:375
void ReplicationSlotRelease(void)
Definition: slot.c:484
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1233
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
TimeLineID ThisTimeLineID
Definition: xlog.c:192
#define ereport(elevel,...)
Definition: elog.h:144
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define Assert(condition)
Definition: c.h:746
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:570
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4757
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:540
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:821
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:623
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:814
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:94
#define snprintf
Definition: port.h:215
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:283
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1123 of file walsender.c.

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1124 {
1125  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1126 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:578

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1512 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1513 {
1514  int parse_rc;
1515  Node *cmd_node;
1516  const char *cmdtag;
1517  MemoryContext cmd_context;
1518  MemoryContext old_context;
1519 
1520  /*
1521  * If WAL sender has been told that shutdown is getting close, switch its
1522  * status accordingly to handle the next replication commands correctly.
1523  */
1524  if (got_STOPPING)
1526 
1527  /*
1528  * Throw error if in stopping mode. We need prevent commands that could
1529  * generate WAL while the shutdown checkpoint is being written. To be
1530  * safe, we just prohibit all new commands.
1531  */
1533  ereport(ERROR,
1534  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1535 
1536  /*
1537  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1538  * command arrives. Clean up the old stuff if there's anything.
1539  */
1541 
1543 
1544  /*
1545  * Parse the command.
1546  */
1548  "Replication command context",
1550  old_context = MemoryContextSwitchTo(cmd_context);
1551 
1552  replication_scanner_init(cmd_string);
1553  parse_rc = replication_yyparse();
1554  if (parse_rc != 0)
1555  ereport(ERROR,
1556  (errcode(ERRCODE_SYNTAX_ERROR),
1557  errmsg_internal("replication command parser returned %d",
1558  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * If it's a SQL command, just clean up our mess and return false; the
1565  * caller will take care of executing it.
1566  */
1567  if (IsA(cmd_node, SQLCmd))
1568  {
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  MemoryContextSwitchTo(old_context);
1574  MemoryContextDelete(cmd_context);
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578  }
1579 
1580  /*
1581  * Report query to various monitoring facilities. For this purpose, we
1582  * report replication commands just like SQL commands.
1583  */
1584  debug_query_string = cmd_string;
1585 
1587 
1588  /*
1589  * Log replication command if log_replication_commands is enabled. Even
1590  * when it's disabled, log the command with DEBUG1 level for backward
1591  * compatibility.
1592  */
1594  (errmsg("received replication command: %s", cmd_string)));
1595 
1596  /*
1597  * Disallow replication commands in aborted transaction blocks.
1598  */
1600  ereport(ERROR,
1601  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1602  errmsg("current transaction is aborted, "
1603  "commands ignored until end of transaction block")));
1604 
1606 
1607  /*
1608  * Allocate buffers that will be used for each outgoing and incoming
1609  * message. We do this just once per command to reduce palloc overhead.
1610  */
1614 
1615  switch (cmd_node->type)
1616  {
1617  case T_IdentifySystemCmd:
1618  cmdtag = "IDENTIFY_SYSTEM";
1619  set_ps_display(cmdtag);
1620  IdentifySystem();
1621  EndReplicationCommand(cmdtag);
1622  break;
1623 
1624  case T_BaseBackupCmd:
1625  cmdtag = "BASE_BACKUP";
1626  set_ps_display(cmdtag);
1627  PreventInTransactionBlock(true, cmdtag);
1628  SendBaseBackup((BaseBackupCmd *) cmd_node);
1629  EndReplicationCommand(cmdtag);
1630  break;
1631 
1633  cmdtag = "CREATE_REPLICATION_SLOT";
1634  set_ps_display(cmdtag);
1636  EndReplicationCommand(cmdtag);
1637  break;
1638 
1640  cmdtag = "DROP_REPLICATION_SLOT";
1641  set_ps_display(cmdtag);
1643  EndReplicationCommand(cmdtag);
1644  break;
1645 
1646  case T_StartReplicationCmd:
1647  {
1648  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1649 
1650  cmdtag = "START_REPLICATION";
1651  set_ps_display(cmdtag);
1652  PreventInTransactionBlock(true, cmdtag);
1653 
1654  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1655  StartReplication(cmd);
1656  else
1658 
1659  /* dupe, but necessary per libpqrcv_endstreaming */
1660  EndReplicationCommand(cmdtag);
1661 
1662  Assert(xlogreader != NULL);
1663  break;
1664  }
1665 
1666  case T_TimeLineHistoryCmd:
1667  cmdtag = "TIMELINE_HISTORY";
1668  set_ps_display(cmdtag);
1669  PreventInTransactionBlock(true, cmdtag);
1671  EndReplicationCommand(cmdtag);
1672  break;
1673 
1674  case T_VariableShowStmt:
1675  {
1677  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1678 
1679  cmdtag = "SHOW";
1680  set_ps_display(cmdtag);
1681 
1682  /* syscache access needs a transaction environment */
1684  GetPGVariable(n->name, dest);
1686  EndReplicationCommand(cmdtag);
1687  }
1688  break;
1689 
1690  default:
1691  elog(ERROR, "unrecognized replication command node tag: %u",
1692  cmd_node->type);
1693  }
1694 
1695  /* done */
1696  MemoryContextSwitchTo(old_context);
1697  MemoryContextDelete(cmd_context);
1698 
1699  /*
1700  * We need not update ps display or pg_stat_activity, because PostgresMain
1701  * will reset those to "idle". But we must reset debug_query_string to
1702  * ensure it doesn't become a dangling pointer.
1703  */
1704  debug_query_string = NULL;
1705 
1706  return true;
1707 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3278
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
void CommitTransactionCommand(void)
Definition: xact.c:2947
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:528
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8993
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:925
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:530
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:88
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1133
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:908
#define Assert(condition)
Definition: c.h:746
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
void StartTransactionCommand(void)
Definition: xact.c:2846
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define elog(elevel,...)
Definition: elog.h:214
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:375
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:693
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2970 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2971 {
2972  XLogRecPtr replayPtr;
2973  TimeLineID replayTLI;
2974  XLogRecPtr receivePtr;
2976  XLogRecPtr result;
2977 
2978  /*
2979  * We can safely send what's already been replayed. Also, if walreceiver
2980  * is streaming WAL from the same timeline, we can send anything that it
2981  * has streamed, but hasn't been replayed yet.
2982  */
2983 
2984  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2985  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2986 
2987  ThisTimeLineID = replayTLI;
2988 
2989  result = replayPtr;
2990  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2991  result = receivePtr;
2992 
2993  return result;
2994 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11500
static TimeLineID receiveTLI
Definition: xlog.c:214
TimeLineID ThisTimeLineID
Definition: xlog.c:192
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3023 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3024 {
3026 
3027  /*
3028  * If replication has not yet started, die like with SIGTERM. If
3029  * replication is active, only set a flag and wake up the main loop. It
3030  * will send any outstanding WAL, wait for it to be replicated to the
3031  * standby, and then exit gracefully.
3032  */
3033  if (!replication_active)
3034  kill(MyProcPid, SIGTERM);
3035  else
3036  got_STOPPING = true;
3037 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:746

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 375 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

376 {
377  char sysid[32];
378  char xloc[MAXFNAMELEN];
379  XLogRecPtr logptr;
380  char *dbname = NULL;
382  TupOutputState *tstate;
383  TupleDesc tupdesc;
384  Datum values[4];
385  bool nulls[4];
386 
387  /*
388  * Reply with a result set with one row, four columns. First col is system
389  * ID, second is timeline ID, third is current xlog location and the
390  * fourth contains the database name if we are connected to one.
391  */
392 
393  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
395 
398  {
399  /* this also updates ThisTimeLineID */
400  logptr = GetStandbyFlushRecPtr();
401  }
402  else
403  logptr = GetFlushRecPtr();
404 
405  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
406 
407  if (MyDatabaseId != InvalidOid)
408  {
410 
411  /* syscache access needs a transaction env. */
413  /* make dbname live outside TX context */
417  /* CommitTransactionCommand switches to TopMemoryContext */
419  }
420 
422  MemSet(nulls, false, sizeof(nulls));
423 
424  /* need a tuple descriptor representing four columns */
425  tupdesc = CreateTemplateTupleDesc(4);
426  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
427  TEXTOID, -1, 0);
428  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
429  INT4OID, -1, 0);
430  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
431  TEXTOID, -1, 0);
432  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
433  TEXTOID, -1, 0);
434 
435  /* prepare for projection of tuples */
436  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
437 
438  /* column 1: system identifier */
439  values[0] = CStringGetTextDatum(sysid);
440 
441  /* column 2: timeline */
442  values[1] = Int32GetDatum(ThisTimeLineID);
443 
444  /* column 3: wal location */
445  values[2] = CStringGetTextDatum(xloc);
446 
447  /* column 4: database name, or NULL if none */
448  if (dbname)
449  values[3] = CStringGetTextDatum(dbname);
450  else
451  nulls[3] = true;
452 
453  /* send it to dest */
454  do_tup_output(tstate, values, nulls);
455 
456  end_tup_output(tstate);
457 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2947
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:950
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8428
bool RecoveryInProgress(void)
Definition: xlog.c:8076
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:375
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:192
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2846
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:165
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4913
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2970
#define snprintf
Definition: port.h:215
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:418
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2393 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2394 {
2395  int i;
2396 
2397  /*
2398  * WalSndCtl should be set up already (we inherit this by fork() or
2399  * EXEC_BACKEND mechanism from the postmaster).
2400  */
2401  Assert(WalSndCtl != NULL);
2402  Assert(MyWalSnd == NULL);
2403 
2404  /*
2405  * Find a free walsender slot and reserve it. This must not fail due to
2406  * the prior check for free WAL senders in InitProcess().
2407  */
2408  for (i = 0; i < max_wal_senders; i++)
2409  {
2410  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2411 
2412  SpinLockAcquire(&walsnd->mutex);
2413 
2414  if (walsnd->pid != 0)
2415  {
2416  SpinLockRelease(&walsnd->mutex);
2417  continue;
2418  }
2419  else
2420  {
2421  /*
2422  * Found a free slot. Reserve it for us.
2423  */
2424  walsnd->pid = MyProcPid;
2425  walsnd->state = WALSNDSTATE_STARTUP;
2426  walsnd->sentPtr = InvalidXLogRecPtr;
2427  walsnd->needreload = false;
2428  walsnd->write = InvalidXLogRecPtr;
2429  walsnd->flush = InvalidXLogRecPtr;
2430  walsnd->apply = InvalidXLogRecPtr;
2431  walsnd->writeLag = -1;
2432  walsnd->flushLag = -1;
2433  walsnd->applyLag = -1;
2434  walsnd->sync_standby_priority = 0;
2435  walsnd->latch = &MyProc->procLatch;
2436  walsnd->replyTime = 0;
2437  SpinLockRelease(&walsnd->mutex);
2438  /* don't need the lock anymore */
2439  MyWalSnd = (WalSnd *) walsnd;
2440 
2441  break;
2442  }
2443  }
2444 
2445  Assert(MyWalSnd != NULL);
2446 
2447  /* Arrange to clean up at walsender exit */
2449 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:40
PGPROC * MyProc
Definition: proc.c:67
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2453
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:121
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:746
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3589 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3590 {
3591  TimestampTz time = 0;
3592 
3593  /* Read all unread samples up to this LSN or end of buffer. */
3594  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3595  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3596  {
3597  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3598  lag_tracker->last_read[head] =
3600  lag_tracker->read_heads[head] =
3602  }
3603 
3604  /*
3605  * If the lag tracker is empty, that means the standby has processed
3606  * everything we've ever sent so we should now clear 'last_read'. If we
3607  * didn't do that, we'd risk using a stale and irrelevant sample for
3608  * interpolation at the beginning of the next burst of WAL after a period
3609  * of idleness.
3610  */
3612  lag_tracker->last_read[head].time = 0;
3613 
3614  if (time > now)
3615  {
3616  /* If the clock somehow went backwards, treat as not found. */
3617  return -1;
3618  }
3619  else if (time == 0)
3620  {
3621  /*
3622  * We didn't cross a time. If there is a future sample that we
3623  * haven't reached yet, and we've already reached at least one sample,
3624  * let's interpolate the local flushed time. This is mainly useful
3625  * for reporting a completely stuck apply position as having
3626  * increasing lag, since otherwise we'd have to wait for it to
3627  * eventually start moving again and cross one of our samples before
3628  * we can show the lag increasing.
3629  */
3631  {
3632  /* There are no future samples, so we can't interpolate. */
3633  return -1;
3634  }
3635  else if (lag_tracker->last_read[head].time != 0)
3636  {
3637  /* We can interpolate between last_read and the next sample. */
3638  double fraction;
3639  WalTimeSample prev = lag_tracker->last_read[head];
3641 
3642  if (lsn < prev.lsn)
3643  {
3644  /*
3645  * Reported LSNs shouldn't normally go backwards, but it's
3646  * possible when there is a timeline change. Treat as not
3647  * found.
3648  */
3649  return -1;
3650  }
3651 
3652  Assert(prev.lsn < next.lsn);
3653 
3654  if (prev.time > next.time)
3655  {
3656  /* If the clock somehow went backwards, treat as not found. */
3657  return -1;
3658  }
3659 
3660  /* See how far we are between the previous and next samples. */
3661  fraction =
3662  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3663 
3664  /* Scale the local flush time proportionally. */
3665  time = (TimestampTz)
3666  ((double) prev.time + (next.time - prev.time) * fraction);
3667  }
3668  else
3669  {
3670  /*
3671  * We have only a future sample, implying that we were entirely
3672  * caught up but and now there is a new burst of WAL and the
3673  * standby hasn't processed the first sample yet. Until the
3674  * standby reaches the future sample the best we can do is report
3675  * the hypothetical lag if that sample were to be replayed now.
3676  */
3677  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3678  }
3679  }
3680 
3681  /* Return the elapsed time since local flush time in microseconds. */
3682  Assert(time != 0);
3683  return now - time;
3684 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:219
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:746
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3524 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3525 {
3526  bool buffer_full;
3527  int new_write_head;
3528  int i;
3529 
3530  if (!am_walsender)
3531  return;
3532 
3533  /*
3534  * If the lsn hasn't advanced since last time, then do nothing. This way
3535  * we only record a new sample when new WAL has been written.
3536  */
3537  if (lag_tracker->last_lsn == lsn)
3538  return;
3539  lag_tracker->last_lsn = lsn;
3540 
3541  /*
3542  * If advancing the write head of the circular buffer would crash into any
3543  * of the read heads, then the buffer is full. In other words, the
3544  * slowest reader (presumably apply) is the one that controls the release
3545  * of space.
3546  */
3547  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3548  buffer_full = false;
3549  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3550  {
3551  if (new_write_head == lag_tracker->read_heads[i])
3552  buffer_full = true;
3553  }
3554 
3555  /*
3556  * If the buffer is full, for now we just rewind by one slot and overwrite
3557  * the last sample, as a simple (if somewhat uneven) way to lower the
3558  * sampling rate. There may be better adaptive compaction algorithms.
3559  */
3560  if (buffer_full)
3561  {
3562  new_write_head = lag_tracker->write_head;
3563  if (lag_tracker->write_head > 0)
3565  else
3567  }
3568 
3569  /* Store a sample at the current write head position. */
3571  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3572  lag_tracker->write_head = new_write_head;
3573 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 814 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

816 {
817  XLogRecPtr flushptr;
818  int count;
819  WALReadError errinfo;
820  XLogSegNo segno;
821 
822  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
824  sendTimeLine = state->currTLI;
826  sendTimeLineNextTLI = state->nextTLI;
827 
828  /* make sure we have enough WAL available */
829  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
830 
831  /* fail if not (implies we are going to shut down) */
832  if (flushptr < targetPagePtr + reqLen)
833  return -1;
834 
835  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
836  count = XLOG_BLCKSZ; /* more than one block available */
837  else
838  count = flushptr - targetPagePtr; /* part of the page available */
839 
840  /* now actually read the data, we know it's there */
841  if (!WALRead(state,
842  cur_page,
843  targetPagePtr,
844  XLOG_BLCKSZ,
845  state->seg.ws_tli, /* Pass the current TLI because only
846  * WalSndSegmentOpen controls whether new
847  * TLI is needed. */
848  &errinfo))
849  WALReadRaiseError(&errinfo);
850 
851  /*
852  * After reading into the buffer, check that what we read was valid. We do
853  * this after reading, because even though the segment was present when we
854  * opened it, it might get recycled or removed while we read it. The
855  * read() succeeds in that case, but the data we tried to read might
856  * already have been overwritten with new WAL records.
857  */
858  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
859  CheckXLogRemoved(segno, state->seg.ws_tli);
860 
861  return count;
862 }
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:958
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:689
WALOpenSegment seg
Definition: xlogreader.h:215
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3928
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:238
TimeLineID nextTLI
Definition: xlogreader.h:244
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1375
TimeLineID ThisTimeLineID
Definition: xlog.c:192
TimeLineID currTLI
Definition: xlogreader.h:228
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1063
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3251 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3252 {
3253  Interval *result = palloc(sizeof(Interval));
3254 
3255  result->month = 0;
3256  result->day = 0;
3257  result->time = offset;
3258 
3259  return result;
3260 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:950

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 868 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

871 {
872  ListCell *lc;
873  bool snapshot_action_given = false;
874  bool reserve_wal_given = false;
875 
876  /* Parse options */
877  foreach(lc, cmd->options)
878  {
879  DefElem *defel = (DefElem *) lfirst(lc);
880 
881  if (strcmp(defel->defname, "export_snapshot") == 0)
882  {
883  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
884  ereport(ERROR,
885  (errcode(ERRCODE_SYNTAX_ERROR),
886  errmsg("conflicting or redundant options")));
887 
888  snapshot_action_given = true;
889  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
891  }
892  else if (strcmp(defel->defname, "use_snapshot") == 0)
893  {
894  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
895  ereport(ERROR,
896  (errcode(ERRCODE_SYNTAX_ERROR),
897  errmsg("conflicting or redundant options")));
898 
899  snapshot_action_given = true;
900  *snapshot_action = CRS_USE_SNAPSHOT;
901  }
902  else if (strcmp(defel->defname, "reserve_wal") == 0)
903  {
904  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
905  ereport(ERROR,
906  (errcode(ERRCODE_SYNTAX_ERROR),
907  errmsg("conflicting or redundant options")));
908 
909  reserve_wal_given = true;
910  *reserve_wal = true;
911  }
912  else
913  elog(ERROR, "unrecognized option: %s", defel->defname);
914  }
915 }
int errcode(int sqlerrcode)
Definition: elog.c:610
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define elog(elevel,...)
Definition: elog.h:214
char * defname
Definition: parsenodes.h:733

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3267 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3268 {
3269 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3270  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3271  TupleDesc tupdesc;
3272  Tuplestorestate *tupstore;
3273  MemoryContext per_query_ctx;
3274  MemoryContext oldcontext;
3275  SyncRepStandbyData *sync_standbys;
3276  int num_standbys;
3277  int i;
3278 
3279  /* check to see if caller supports us returning a tuplestore */
3280  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3281  ereport(ERROR,
3282  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3283  errmsg("set-valued function called in context that cannot accept a set")));
3284  if (!(rsinfo->allowedModes & SFRM_Materialize))
3285  ereport(ERROR,
3286  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3287  errmsg("materialize mode required, but it is not allowed in this context")));
3288 
3289  /* Build a tuple descriptor for our result type */
3290  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3291  elog(ERROR, "return type must be a row type");
3292 
3293  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3294  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3295 
3296  tupstore = tuplestore_begin_heap(true, false, work_mem);
3297  rsinfo->returnMode = SFRM_Materialize;
3298  rsinfo->setResult = tupstore;
3299  rsinfo->setDesc = tupdesc;
3300 
3301  MemoryContextSwitchTo(oldcontext);
3302 
3303  /*
3304  * Get the currently active synchronous standbys. This could be out of
3305  * date before we're done, but we'll use the data anyway.
3306  */
3307  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3308 
3309  for (i = 0; i < max_wal_senders; i++)
3310  {
3311  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3313  XLogRecPtr write;
3314  XLogRecPtr flush;
3315  XLogRecPtr apply;
3316  TimeOffset writeLag;
3317  TimeOffset flushLag;
3318  TimeOffset applyLag;
3319  int priority;
3320  int pid;
3322  TimestampTz replyTime;
3323  bool is_sync_standby;
3325  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3326  int j;
3327 
3328  /* Collect data from shared memory */
3329  SpinLockAcquire(&walsnd->mutex);
3330  if (walsnd->pid == 0)
3331  {
3332  SpinLockRelease(&walsnd->mutex);
3333  continue;
3334  }
3335  pid = walsnd->pid;
3336  sentPtr = walsnd->sentPtr;
3337  state = walsnd->state;
3338  write = walsnd->write;
3339  flush = walsnd->flush;
3340  apply = walsnd->apply;
3341  writeLag = walsnd->writeLag;
3342  flushLag = walsnd->flushLag;
3343  applyLag = walsnd->applyLag;
3344  priority = walsnd->sync_standby_priority;
3345  replyTime = walsnd->replyTime;
3346  SpinLockRelease(&walsnd->mutex);
3347 
3348  /*
3349  * Detect whether walsender is/was considered synchronous. We can
3350  * provide some protection against stale data by checking the PID
3351  * along with walsnd_index.
3352  */
3353  is_sync_standby = false;
3354  for (j = 0; j < num_standbys; j++)
3355  {
3356  if (sync_standbys[j].walsnd_index == i &&
3357  sync_standbys[j].pid == pid)
3358  {
3359  is_sync_standby = true;
3360  break;
3361  }
3362  }
3363 
3364  memset(nulls, 0, sizeof(nulls));
3365  values[0] = Int32GetDatum(pid);
3366 
3367  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3368  {
3369  /*
3370  * Only superusers and members of pg_read_all_stats can see
3371  * details. Other users only get the pid value to know it's a
3372  * walsender, but no details.
3373  */
3374  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3375  }
3376  else
3377  {
3378  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3379 
3380  if (XLogRecPtrIsInvalid(sentPtr))
3381  nulls[2] = true;
3382  values[2] = LSNGetDatum(sentPtr);
3383 
3384  if (XLogRecPtrIsInvalid(write))
3385  nulls[3] = true;
3386  values[3] = LSNGetDatum(write);
3387 
3388  if (XLogRecPtrIsInvalid(flush))
3389  nulls[4] = true;
3390  values[4] = LSNGetDatum(flush);
3391 
3392  if (XLogRecPtrIsInvalid(apply))
3393  nulls[5] = true;
3394  values[5] = LSNGetDatum(apply);
3395 
3396  /*
3397  * Treat a standby such as a pg_basebackup background process
3398  * which always returns an invalid flush location, as an
3399  * asynchronous standby.
3400  */
3401  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3402 
3403  if (writeLag < 0)
3404  nulls[6] = true;
3405  else
3406  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3407 
3408  if (flushLag < 0)
3409  nulls[7] = true;
3410  else
3411  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3412 
3413  if (applyLag < 0)
3414  nulls[8] = true;
3415  else
3416  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3417 
3418  values[9] = Int32GetDatum(priority);
3419 
3420  /*
3421  * More easily understood version of standby state. This is purely
3422  * informational.
3423  *
3424  * In quorum-based sync replication, the role of each standby
3425  * listed in synchronous_standby_names can be changing very
3426  * frequently. Any standbys considered as "sync" at one moment can
3427  * be switched to "potential" ones at the next moment. So, it's
3428  * basically useless to report "sync" or "potential" as their sync
3429  * states. We report just "quorum" for them.
3430  */
3431  if (priority == 0)
3432  values[10] = CStringGetTextDatum("async");
3433  else if (is_sync_standby)
3435  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3436  else
3437  values[10] = CStringGetTextDatum("potential");
3438 
3439  if (replyTime == 0)
3440  nulls[11] = true;
3441  else
3442  values[11] = TimestampTzGetDatum(replyTime);
3443  }
3444 
3445  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3446  }
3447 
3448  /* clean up and return the tuplestore */
3449  tuplestore_donestoring(tupstore);
3450 
3451  return (Datum) 0;
3452 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:476
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:950
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:43
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3232
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:121
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:144
int allowedModes
Definition: execnodes.h:304
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:306
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:232
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:309
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:726
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:302
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:310
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define elog(elevel,...)
Definition: elog.h:214
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:86
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3251
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1849 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1850 {
1851  bool changed = false;
1853 
1854  Assert(lsn != InvalidXLogRecPtr);
1855  SpinLockAcquire(&slot->mutex);
1856  if (slot->data.restart_lsn != lsn)
1857  {
1858  changed = true;
1859  slot->data.restart_lsn = lsn;
1860  }
1861  SpinLockRelease(&slot->mutex);
1862 
1863  if (changed)
1864  {
1867  }
1868 
1869  /*
1870  * One could argue that the slot should be saved to disk now, but that'd
1871  * be energy wasted - the worst lost information can do here is give us
1872  * wrong information in a statistics view - we'll just potentially be more
1873  * conservative in removing files.
1874  */
1875 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:143
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:826
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:746
XLogRecPtr restart_lsn
Definition: slot.h:80
slock_t mutex
Definition: slot.h:116
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1986 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1987 {
1988  bool changed = false;
1990 
1991  SpinLockAcquire(&slot->mutex);
1993 
1994  /*
1995  * For physical replication we don't need the interlock provided by xmin
1996  * and effective_xmin since the consequences of a missed increase are
1997  * limited to query cancellations, so set both at once.
1998  */
1999  if (!TransactionIdIsNormal(slot->data.xmin) ||
2000  !TransactionIdIsNormal(feedbackXmin) ||
2001  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2002  {
2003  changed = true;
2004  slot->data.xmin = feedbackXmin;
2005  slot->effective_xmin = feedbackXmin;
2006  }
2007  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2008  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2009  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2010  {
2011  changed = true;
2012  slot->data.catalog_xmin = feedbackCatalogXmin;
2013  slot->effective_catalog_xmin = feedbackCatalogXmin;
2014  }
2015  SpinLockRelease(&slot->mutex);
2016 
2017  if (changed)
2018  {
2021  }
2022 }
PGPROC * MyProc
Definition: proc.c:67
ReplicationSlotPersistentData data
Definition: slot.h:143
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:139
TransactionId xmin
Definition: proc.h:129
TransactionId catalog_xmin
Definition: slot.h:77
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:69
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:140
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:116
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:776
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1714 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1715 {
1716  unsigned char firstchar;
1717  int r;
1718  bool received = false;
1719 
1721 
1722  for (;;)
1723  {
1724  pq_startmsgread();
1725  r = pq_getbyte_if_available(&firstchar);
1726  if (r < 0)
1727  {
1728  /* unexpected error or EOF */
1730  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1731  errmsg("unexpected EOF on standby connection")));
1732  proc_exit(0);
1733  }
1734  if (r == 0)
1735  {
1736  /* no data available without blocking */
1737  pq_endmsgread();
1738  break;
1739  }
1740 
1741  /* Read the message contents */
1743  if (pq_getmessage(&reply_message, 0))
1744  {
1746  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1747  errmsg("unexpected EOF on standby connection")));
1748  proc_exit(0);
1749  }
1750 
1751  /*
1752  * If we already received a CopyDone from the frontend, the frontend
1753  * should not send us anything until we've closed our end of the COPY.
1754  * XXX: In theory, the frontend could already send the next command
1755  * before receiving the CopyDone, but libpq doesn't currently allow
1756  * that.
1757  */
1758  if (streamingDoneReceiving && firstchar != 'X')
1759  ereport(FATAL,
1760  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1761  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1762  firstchar)));
1763 
1764  /* Handle the very limited subset of commands expected in this phase */
1765  switch (firstchar)
1766  {
1767  /*
1768  * 'd' means a standby reply wrapped in a CopyData packet.
1769  */
1770  case 'd':
1772  received = true;
1773  break;
1774 
1775  /*
1776  * CopyDone means the standby requested to finish streaming.
1777  * Reply with CopyDone, if we had not sent that already.
1778  */
1779  case 'c':
1780  if (!streamingDoneSending)
1781  {
1782  pq_putmessage_noblock('c', NULL, 0);
1783  streamingDoneSending = true;
1784  }
1785 
1786  streamingDoneReceiving = true;
1787  received = true;
1788  break;
1789 
1790  /*
1791  * 'X' means that the standby is closing down the socket.
1792  */
1793  case 'X':
1794  proc_exit(0);
1795 
1796  default:
1797  ereport(FATAL,
1798  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1799  errmsg("invalid standby message type \"%c\"",
1800  firstchar)));
1801  }
1802  }
1803 
1804  /*
1805  * Save the last reply timestamp if we've received at least one reply.
1806  */
1807  if (received)
1808  {
1810  waiting_for_ping_response = false;
1811  }
1812 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1818
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1197
#define FATAL
Definition: elog.h:52
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1027
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1259
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1221
#define ereport(elevel,...)
Definition: elog.h:144
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2066 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, log_min_messages, WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

2067 {
2068  TransactionId feedbackXmin;
2069  uint32 feedbackEpoch;
2070  TransactionId feedbackCatalogXmin;
2071  uint32 feedbackCatalogEpoch;
2072  TimestampTz replyTime;
2073 
2074  /*
2075  * Decipher the reply message. The caller already consumed the msgtype
2076  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2077  * of this message.
2078  */
2079  replyTime = pq_getmsgint64(&reply_message);
2080  feedbackXmin = pq_getmsgint(&reply_message, 4);
2081  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2082  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2083  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2084 
2085  if (log_min_messages <= DEBUG2)
2086  {
2087  char *replyTimeStr;
2088 
2089  /* Copy because timestamptz_to_str returns a static buffer */
2090  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2091 
2092  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2093  feedbackXmin,
2094  feedbackEpoch,
2095  feedbackCatalogXmin,
2096  feedbackCatalogEpoch,
2097  replyTimeStr);
2098 
2099  pfree(replyTimeStr);
2100  }
2101 
2102  /*
2103  * Update shared state for this WalSender process based on reply data from
2104  * standby.
2105  */
2106  {
2107  WalSnd *walsnd = MyWalSnd;
2108 
2109  SpinLockAcquire(&walsnd->mutex);
2110  walsnd->replyTime = replyTime;
2111  SpinLockRelease(&walsnd->mutex);
2112  }
2113 
2114  /*
2115  * Unset WalSender's xmins if the feedback message values are invalid.
2116  * This happens when the downstream turned hot_standby_feedback off.
2117  */
2118  if (!TransactionIdIsNormal(feedbackXmin)
2119  && !TransactionIdIsNormal(feedbackCatalogXmin))
2120  {
2122  if (MyReplicationSlot != NULL)
2123  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2124  return;
2125  }
2126 
2127  /*
2128  * Check that the provided xmin/epoch are sane, that is, not in the future
2129  * and not so far back as to be already wrapped around. Ignore if not.
2130  */
2131  if (TransactionIdIsNormal(feedbackXmin) &&
2132  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2133  return;
2134 
2135  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2136  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2137  return;
2138 
2139  /*
2140  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2141  * the xmin will be taken into account by GetSnapshotData() /
2142  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2143  * thereby prevent the generation of cleanup conflicts on the standby
2144  * server.
2145  *
2146  * There is a small window for a race condition here: although we just
2147  * checked that feedbackXmin precedes nextXid, the nextXid could have
2148  * gotten advanced between our fetching it and applying the xmin below,
2149  * perhaps far enough to make feedbackXmin wrap around. In that case the
2150  * xmin we set here would be "in the future" and have no effect. No point
2151  * in worrying about this since it's too late to save the desired data
2152  * anyway. Assuming that the standby sends us an increasing sequence of
2153  * xmins, this could only happen during the first reply cycle, else our
2154  * own xmin would prevent nextXid from advancing so far.
2155  *
2156  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2157  * is assumed atomic, and there's no real need to prevent concurrent
2158  * horizon determinations. (If we're moving our xmin forward, this is
2159  * obviously safe, and if we're moving it backwards, well, the data is at
2160  * risk already since a VACUUM could already have determined the horizon.)
2161  *
2162  * If we're using a replication slot we reserve the xmin via that,
2163  * otherwise via the walsender's PGPROC entry. We can only track the
2164  * catalog xmin separately when using a slot, so we store the least of the
2165  * two provided when not using a slot.
2166  *
2167  * XXX: It might make sense to generalize the ephemeral slot concept and
2168  * always use the slot mechanism to handle the feedback xmin.
2169  */
2170  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2171  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2172  else
2173  {
2174  if (TransactionIdIsNormal(feedbackCatalogXmin)
2175  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2176  MyProc->xmin = feedbackCatalogXmin;
2177  else
2178  MyProc->xmin = feedbackXmin;
2179  }
2180 }
uint32 TransactionId
Definition: c.h:521
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
slock_t mutex
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2035
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1057
TransactionId xmin
Definition: proc.h:129
#define DEBUG2
Definition: elog.h:24
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:375
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
int log_min_messages
Definition: guc.c:538
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1986
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1740

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1818 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1819 {
1820  char msgtype;
1821 
1822  /*
1823  * Check message type from the first byte.
1824  */
1825  msgtype = pq_getmsgbyte(&reply_message);
1826 
1827  switch (msgtype)
1828  {
1829  case 'r':
1831  break;
1832 
1833  case 'h':
1835  break;
1836 
1837  default:
1839  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1840  errmsg("unexpected message type \"%c\"", msgtype)));
1841  proc_exit(0);
1842  }
1843 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:144
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1881
int errmsg(const char *fmt,...)
Definition: elog.c:821
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2066

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1881 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), log_min_messages, LogicalConfirmReceivedLocation(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1882 {
1883  XLogRecPtr writePtr,
1884  flushPtr,
1885  applyPtr;
1886  bool replyRequested;
1887  TimeOffset writeLag,
1888  flushLag,
1889  applyLag;
1890  bool clearLagTimes;
1891  TimestampTz now;
1892  TimestampTz replyTime;
1893 
1894  static bool fullyAppliedLastTime = false;
1895 
1896  /* the caller already consumed the msgtype byte */
1897  writePtr = pq_getmsgint64(&reply_message);
1898  flushPtr = pq_getmsgint64(&reply_message);
1899  applyPtr = pq_getmsgint64(&reply_message);
1900  replyTime = pq_getmsgint64(&reply_message);
1901  replyRequested = pq_getmsgbyte(&reply_message);
1902 
1903  if (log_min_messages <= DEBUG2)
1904  {
1905  char *replyTimeStr;
1906 
1907  /* Copy because timestamptz_to_str returns a static buffer */
1908  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1909 
1910  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1911  (uint32) (writePtr >> 32), (uint32) writePtr,
1912  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1913  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1914  replyRequested ? " (reply requested)" : "",
1915  replyTimeStr);
1916 
1917  pfree(replyTimeStr);
1918  }
1919 
1920  /* See if we can compute the round-trip lag for these positions. */
1921  now = GetCurrentTimestamp();
1922  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1923  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1924  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1925 
1926  /*
1927  * If the standby reports that it has fully replayed the WAL in two
1928  * consecutive reply messages, then the second such message must result
1929  * from wal_receiver_status_interval expiring on the standby. This is a
1930  * convenient time to forget the lag times measured when it last
1931  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1932  * until more WAL traffic arrives.
1933  */
1934  clearLagTimes = false;
1935  if (applyPtr == sentPtr)
1936  {
1937  if (fullyAppliedLastTime)
1938  clearLagTimes = true;
1939  fullyAppliedLastTime = true;
1940  }
1941  else
1942  fullyAppliedLastTime = false;
1943 
1944  /* Send a reply if the standby requested one. */
1945  if (replyRequested)
1946  WalSndKeepalive(false);
1947 
1948  /*
1949  * Update shared state for this WalSender process based on reply data from
1950  * standby.
1951  */
1952  {
1953  WalSnd *walsnd = MyWalSnd;
1954 
1955  SpinLockAcquire(&walsnd->mutex);
1956  walsnd->write = writePtr;
1957  walsnd->flush = flushPtr;
1958  walsnd->apply = applyPtr;
1959  if (writeLag != -1 || clearLagTimes)
1960  walsnd->writeLag = writeLag;
1961  if (flushLag != -1 || clearLagTimes)
1962  walsnd->flushLag = flushLag;
1963  if (applyLag != -1 || clearLagTimes)
1964  walsnd->applyLag = applyLag;
1965  walsnd->replyTime = replyTime;
1966  SpinLockRelease(&walsnd->mutex);
1967  }
1968 
1971 
1972  /*
1973  * Advance our local xmin horizon when the client confirmed a flush.
1974  */
1975  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1976  {
1979  else
1981  }
1982 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1057
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3463
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1849
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
unsigned int uint32
Definition: c.h:375
#define SlotIsLogical(slot)
Definition: slot.h:165
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3589
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int log_min_messages
Definition: guc.c:538
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1372
#define elog(elevel,...)
Definition: elog.h:214
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:441
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1740

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 464 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

465 {
467  char histfname[MAXFNAMELEN];
468  char path[MAXPGPATH];
469  int fd;
470  off_t histfilelen;
471  off_t bytesleft;
472  Size len;
473 
474  /*
475  * Reply with a result set with one row, and two columns. The first col is
476  * the name of the history file, 2nd is the contents.
477  */
478 
479  TLHistoryFileName(histfname, cmd->timeline);
480  TLHistoryFilePath(path, cmd->timeline);
481 
482  /* Send a RowDescription message */
483  pq_beginmessage(&buf, 'T');
484  pq_sendint16(&buf, 2); /* 2 fields */
485 
486  /* first field */
487  pq_sendstring(&buf, "filename"); /* col name */
488  pq_sendint32(&buf, 0); /* table oid */
489  pq_sendint16(&buf, 0); /* attnum */
490  pq_sendint32(&buf, TEXTOID); /* type oid */
491  pq_sendint16(&buf, -1); /* typlen */
492  pq_sendint32(&buf, 0); /* typmod */
493  pq_sendint16(&buf, 0); /* format code */
494 
495  /* second field */
496  pq_sendstring(&buf, "content"); /* col name */
497  pq_sendint32(&buf, 0); /* table oid */
498  pq_sendint16(&buf, 0); /* attnum */
499  pq_sendint32(&buf, BYTEAOID); /* type oid */
500  pq_sendint16(&buf, -1); /* typlen */
501  pq_sendint32(&buf, 0); /* typmod */
502  pq_sendint16(&buf, 0); /* format code */
503  pq_endmessage(&buf);
504 
505  /* Send a DataRow message */
506  pq_beginmessage(&buf, 'D');
507  pq_sendint16(&buf, 2); /* # of columns */
508  len = strlen(histfname);
509  pq_sendint32(&buf, len); /* col1 len */
510  pq_sendbytes(&buf, histfname, len);
511 
512  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
513  if (fd < 0)
514  ereport(ERROR,
516  errmsg("could not open file \"%s\": %m", path)));
517 
518  /* Determine file length and send it to client */
519  histfilelen = lseek(fd, 0, SEEK_END);
520  if (histfilelen < 0)
521  ereport(ERROR,
523  errmsg("could not seek to end of file \"%s\": %m", path)));
524  if (lseek(fd, 0, SEEK_SET) != 0)
525  ereport(ERROR,
527  errmsg("could not seek to beginning of file \"%s\": %m", path)));
528 
529  pq_sendint32(&buf, histfilelen); /* col2 len */
530 
531  bytesleft = histfilelen;
532  while (bytesleft > 0)
533  {
534  PGAlignedBlock rbuf;
535  int nread;
536 
538  nread = read(fd, rbuf.data, sizeof(rbuf));
540  if (nread < 0)
541  ereport(ERROR,
543  errmsg("could not read file \"%s\": %m",
544  path)));
545  else if (nread == 0)
546  ereport(ERROR,
548  errmsg("could not read file \"%s\": read %d of %zu",
549  path, nread, (Size) bytesleft)));
550 
551  pq_sendbytes(&buf, rbuf.data, nread);
552  bytesleft -= nread;
553  }
554 
555  if (CloseTransientFile(fd) != 0)
556  ereport(ERROR,
558  errmsg("could not close file \"%s\": %m", path)));
559 
560  pq_endmessage(&buf);
561 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:610
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1213
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1083
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:633
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1466
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:144
size_t Size
Definition: c.h:474
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1442
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:821
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1133 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, SAB_Error, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1134 {
1136  QueryCompletion qc;
1137 
1138  /* make sure that our requirements are still fulfilled */
1140 
1142 
1144 
1146  ereport(ERROR,
1147  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1148  errmsg("cannot read from logical replication slot \"%s\"",
1149  cmd->slotname),
1150  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1151 
1152  /*
1153  * Force a disconnect, so that the decoding code doesn't need to care
1154  * about an eventual switch from running in recovery, to running in a
1155  * normal environment. Client code is expected to handle reconnects.
1156  */
1158  {
1159  ereport(LOG,
1160  (errmsg("terminating walsender process after promotion")));
1161  got_STOPPING = true;
1162  }
1163 
1164  /*
1165  * Create our decoding context, making it start at the previously ack'ed
1166  * position.
1167  *
1168  * Do this before sending a CopyBothResponse message, so that any errors
1169  * are reported early.
1170  */
1172  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1173  XL_ROUTINE(.page_read = logical_read_xlog_page,
1174  .segment_open = WalSndSegmentOpen,
1175  .segment_close = wal_segment_close),
1179 
1181 
1182  /* Send a CopyBothResponse message, and start streaming */
1183  pq_beginmessage(&buf, 'W');
1184  pq_sendbyte(&buf, 0);
1185  pq_sendint16(&buf, 0);
1186  pq_endmessage(&buf);
1187  pq_flush();
1188 
1189  /* Start reading WAL from the oldest required WAL. */
1192 
1193  /*
1194  * Report the location after which we'll send out further commits as the
1195  * current sentPtr.
1196  */
1198 
1199  /* Also update the sent position status in shared memory */
1203 
1204  replication_active = true;
1205 
1207 
1208  /* Main loop of walsender */
1210 
1213 
1214  replication_active = false;
1215  if (got_STOPPING)
1216  proc_exit(0);
1218 
1219  /* Get out of COPY mode (CommandComplete). */
1220  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1221  EndCommand(&qc, DestRemote, false);
1222 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
#define pq_flush()
Definition: libpq.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
ReplicationSlotPersistentData data
Definition: slot.h:143
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8076
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:91
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:43
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:435
static char * buf
Definition: pg_test_fsync.c:68
int errdetail(const char *fmt,...)
Definition: elog.c:954
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:240
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1260
void ReplicationSlotRelease(void)
Definition: slot.c:484
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2267
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1233
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:746
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:570
static void XLogSendLogical(void)
Definition: walsender.c:2850
XLogRecPtr restart_lsn
Definition: slot.h:80
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:821
XLogReaderState * reader
Definition: logical.h:41
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:375
Definition: slot.h:42
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:814
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:94
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 570 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), SAB_Error, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

571 {
573  XLogRecPtr FlushPtr;
574 
575  if (ThisTimeLineID == 0)
576  ereport(ERROR,
577  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
578  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
579 
580  /* create xlogreader for physical replication */
581  xlogreader =
583  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
584  .segment_close = wal_segment_close),
585  NULL);
586 
587  if (!xlogreader)
588  ereport(ERROR,
589  (errcode(ERRCODE_OUT_OF_MEMORY),
590  errmsg("out of memory")));
591 
592  /*
593  * We assume here that we're logging enough information in the WAL for
594  * log-shipping, since this is checked in PostmasterMain().
595  *
596  * NOTE: wal_level can only change at shutdown, so in most cases it is
597  * difficult for there to be WAL data that we can still see that was
598  * written at wal_level='minimal'.
599  */
600 
601  if (cmd->slotname)
602  {
605  ereport(ERROR,
606  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
607  errmsg("cannot use a logical replication slot for physical replication")));
608 
609  /*
610  * We don't need to verify the slot's restart_lsn here; instead we
611  * rely on the caller requesting the starting point to use. If the
612  * WAL segment doesn't exist, we'll fail later.
613  */
614  }
615 
616  /*
617  * Select the timeline. If it was given explicitly by the client, use
618  * that. Otherwise use the timeline of the last replayed record, which is
619  * kept in ThisTimeLineID.
620  */
622  {
623  /* this also updates ThisTimeLineID */
624  FlushPtr = GetStandbyFlushRecPtr();
625  }
626  else
627  FlushPtr = GetFlushRecPtr();
628 
629  if (cmd->timeline != 0)
630  {
631  XLogRecPtr switchpoint;
632 
633  sendTimeLine = cmd->timeline;
635  {
636  sendTimeLineIsHistoric = false;
638  }
639  else
640  {
641  List *timeLineHistory;
642 
643  sendTimeLineIsHistoric = true;
644 
645  /*
646  * Check that the timeline the client requested exists, and the
647  * requested start location is on that timeline.
648  */
649  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
650  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
652  list_free_deep(timeLineHistory);
653 
654  /*
655  * Found the requested timeline in the history. Check that
656  * requested startpoint is on that timeline in our history.
657  *
658  * This is quite loose on purpose. We only check that we didn't
659  * fork off the requested timeline before the switchpoint. We
660  * don't check that we switched *to* it before the requested
661  * starting point. This is because the client can legitimately
662  * request to start replication from the beginning of the WAL
663  * segment that contains switchpoint, but on the new timeline, so
664  * that it doesn't end up with a partial segment. If you ask for
665  * too old a starting point, you'll get an error later when we
666  * fail to find the requested WAL segment in pg_wal.
667  *
668  * XXX: we could be more strict here and only allow a startpoint
669  * that's older than the switchpoint, if it's still in the same
670  * WAL segment.
671  */
672  if (!XLogRecPtrIsInvalid(switchpoint) &&
673  switchpoint < cmd->startpoint)
674  {
675  ereport(ERROR,
676  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
677  (uint32) (cmd->startpoint >> 32),
678  (uint32) (cmd->startpoint),
679  cmd->timeline),
680  errdetail("This server's history forked from timeline %u at %X/%X.",
681  cmd->timeline,
682  (uint32) (switchpoint >> 32),
683  (uint32) (switchpoint))));
684  }
685  sendTimeLineValidUpto = switchpoint;
686  }
687  }
688  else
689  {
692  sendTimeLineIsHistoric = false;
693  }
694 
696 
697  /* If there is nothing to stream, don't even enter COPY mode */
699  {
700  /*
701  * When we first start replication the standby will be behind the
702  * primary. For some applications, for example synchronous
703  * replication, it is important to have a clear state for this initial
704  * catchup mode, so we can trigger actions when we change streaming
705  * state later. We may stay in this state for a long time, which is
706  * exactly why we want to be able to monitor whether or not we are
707  * still here.
708  */
710 
711  /* Send a CopyBothResponse message, and start streaming */
712  pq_beginmessage(&buf, 'W');
713  pq_sendbyte(&buf, 0);
714  pq_sendint16(&buf, 0);
715  pq_endmessage(&buf);
716  pq_flush();
717 
718  /*
719  * Don't allow a request to stream from a future point in WAL that
720  * hasn't been flushed to disk in this server yet.
721  */
722  if (FlushPtr < cmd->startpoint)
723  {
724  ereport(ERROR,
725  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
726  (uint32) (cmd->startpoint >> 32),
727  (uint32) (cmd->startpoint),
728  (uint32) (FlushPtr >> 32),
729  (uint32) (FlushPtr))));
730  }
731 
732  /* Start streaming from the requested point */
733  sentPtr = cmd->startpoint;
734 
735  /* Initialize shared memory status, too */
739 
741 
742  /* Main loop of walsender */
743  replication_active = true;
744 
746 
747  replication_active = false;
748  if (got_STOPPING)
749  proc_exit(0);
751 
753  }
754 
755  if (cmd->slotname)
757 
758  /*
759  * Copy is finished now. Send a single-row result set indicating the next
760  * timeline.
761  */
763  {
764  char startpos_str[8 + 1 + 8 + 1];
766  TupOutputState *tstate;
767  TupleDesc tupdesc;
768  Datum values[2];
769  bool nulls[2];
770 
771  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
772  (uint32) (sendTimeLineValidUpto >> 32),
774 
776  MemSet(nulls, false, sizeof(nulls));
777 
778  /*
779  * Need a tuple descriptor representing two columns. int8 may seem
780  * like a surprising data type for this, but in theory int4 would not
781  * be wide enough for this, as TimeLineID is unsigned.
782  */
783  tupdesc = CreateTemplateTupleDesc(2);
784  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
785  INT8OID, -1, 0);
786  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
787  TEXTOID, -1, 0);
788 
789  /* prepare for projection of tuple */
790  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
791 
792  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
793  values[1] = CStringGetTextDatum(startpos_str);
794 
795  /* send it to dest */
796  do_tup_output(tstate, values, nulls);
797 
798  end_tup_output(tstate);
799  }
800 
801  /* Send CommandComplete message */
802  EndReplicationCommand("START_STREAMING");
803 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
#define pq_flush()
Definition: libpq.h:39
int wal_segment_size
Definition: xlog.c:117
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2549
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define MemSet(start, val, len)
Definition: c.h:950
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8428
void list_free_deep(List *list)
Definition: list.c:1390
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:954
unsigned int uint32
Definition: c.h:375
void ReplicationSlotRelease(void)
Definition: slot.c:484
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:75
#define SlotIsLogical(slot)
Definition: slot.h:165
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1701
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2267
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
TimeLineID ThisTimeLineID
Definition: xlog.c:192
#define ereport(elevel,...)
Definition: elog.h:144
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:746
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:821
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:375
Definition: slot.h:42
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:86
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2970
Definition: pg_list.h:50
#define snprintf
Definition: port.h:215
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2035 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2036 {
2037  FullTransactionId nextFullXid;
2038  TransactionId nextXid;
2039  uint32 nextEpoch;
2040 
2041  nextFullXid = ReadNextFullTransactionId();
2042  nextXid = XidFromFullTransactionId(nextFullXid);
2043  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2044 
2045  if (xid <= nextXid)
2046  {
2047  if (epoch != nextEpoch)
2048  return false;
2049  }
2050  else
2051  {
2052  if (epoch + 1 != nextEpoch)
2053  return false;
2054  }
2055 
2056  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2057  return false; /* epoch OK, but it's wrapped around */
2058 
2059  return true;
2060 }
uint32 TransactionId
Definition: c.h:521
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:375
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2240 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2241 {
2242  TimestampTz timeout;
2243 
2244  /* don't bail out if we're doing something that doesn't require timeouts */
2245  if (last_reply_timestamp <= 0)
2246  return;
2247 
2250 
2251  if (wal_sender_timeout > 0 && last_processing >= timeout)
2252  {
2253  /*
2254  * Since typically expiration of replication timeout means
2255  * communication problem, we don't send the error message to the
2256  * standby.
2257  */
2259  (errmsg("terminating walsender process due to replication timeout")));
2260 
2261  WalSndShutdown();
2262  }
2263 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:821
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2190 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifference(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2191 {
2192  long sleeptime = 10000; /* 10 s */
2193 
2195  {
2196  TimestampTz wakeup_time;
2197  long sec_to_timeout;
2198  int microsec_to_timeout;
2199 
2200  /*
2201  * At the latest stop sleeping once wal_sender_timeout has been
2202  * reached.
2203  */
2206 
2207  /*
2208  * If no ping has been sent yet, wakeup when it's time to do so.
2209  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2210  * the timeout passed without a response.
2211  */
2214  wal_sender_timeout / 2);
2215 
2216  /* Compute relative time until wakeup. */
2217  TimestampDifference(now, wakeup_time,
2218  &sec_to_timeout, &microsec_to_timeout);
2219 
2220  sleeptime = sec_to_timeout * 1000 +
2221  microsec_to_timeout / 1000;
2222  }
2223 
2224  return sleeptime;
2225 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:41
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1652
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2930 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2931 {
2932  XLogRecPtr replicatedPtr;
2933 
2934  /* ... let's just be real sure we're caught up ... */
2935  send_data();
2936 
2937  /*
2938  * To figure out whether all WAL has successfully been replicated, check
2939  * flush location if valid, write otherwise. Tools like pg_receivewal will
2940  * usually (unless in synchronous mode) return an invalid flush location.
2941  */
2942  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2944 
2945  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2946  !pq_is_send_pending())
2947  {
2948  QueryCompletion qc;
2949 
2950  /* Inform the standby that XLOG streaming is done */
2951  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2952  EndCommand(&qc, DestRemote, false);
2953  pq_flush();
2954 
2955  proc_exit(0);
2956  }
2958  WalSndKeepalive(true);
2959 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3463
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 295 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:484
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1466
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
void ReplicationSlotCleanup(void)
Definition: slot.c:540
void LWLockReleaseAll(void)
Definition: lwlock.c:1911

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3232 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3233 {
3234  switch (state)
3235  {
3236  case WALSNDSTATE_STARTUP:
3237  return "startup";
3238  case WALSNDSTATE_BACKUP:
3239  return "backup";
3240  case WALSNDSTATE_CATCHUP:
3241  return "catchup";
3242  case WALSNDSTATE_STREAMING:
3243  return "streaming";
3244  case WALSNDSTATE_STOPPING:
3245  return "stopping";
3246  }
3247  return "UNKNOWN";
3248 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3149 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3150 {
3151  int i;
3152 
3153  for (i = 0; i < max_wal_senders; i++)
3154  {
3155  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3156  pid_t pid;
3157 
3158  SpinLockAcquire(&walsnd->mutex);
3159  pid = walsnd->pid;
3160  SpinLockRelease(&walsnd->mutex);
3161 
3162  if (pid == 0)
3163  continue;
3164 
3166  }
3167 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3463 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, and waiting_for_ping_response.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3464 {
3465  elog(DEBUG2, "sending replication keepalive");
3466 
3467  /* construct the message... */
3469  pq_sendbyte(&output_message, 'k');
3472  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3473 
3474  /* ... and send it wrapped in CopyData */
3476 
3477  /* Set local flag */
3478  if (requestReply)
3480 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:214
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3486 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3487 {
3488  TimestampTz ping_time;
3489 
3490  /*
3491  * Don't send keepalive messages if timeouts are globally disabled or
3492  * we're doing something not partaking in timeouts.
3493  */
3494  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3495  return;
3496 
3498  return;
3499 
3500  /*
3501  * If half of wal_sender_timeout has lapsed without receiving any reply
3502  * from the standby, send a keep-alive message to the standby requesting
3503  * an immediate reply.
3504  */
3506  wal_sender_timeout / 2);
3507  if (last_processing >= ping_time)
3508  {
3509  WalSndKeepalive(true);
3510 
3511  /* Try to flush pending output to the client */
3512  if (pq_flush_if_writable() != 0)
3513  WalSndShutdown();
3514  }
3515 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3463
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2453 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2454 {
2455  WalSnd *walsnd = MyWalSnd;
2456 
2457  Assert(walsnd != NULL);
2458 
2459  MyWalSnd = NULL;
2460 
2461  SpinLockAcquire(&walsnd->mutex);
2462  /* clear latch while holding the spinlock, so it can safely be read */
2463  walsnd->latch = NULL;
2464  /* Mark WalSnd struct as no longer being in use. */
2465  walsnd->pid = 0;
2466  SpinLockRelease(&walsnd->mutex);
2467 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:746

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3045 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3046 {
3047  int save_errno = errno;
3048 
3049  got_SIGUSR2 = true;
3050  SetLatch(MyLatch);
3051 
3052  errno = save_errno;
3053 }
void SetLatch(Latch *latch)
Definition: latch.c:505
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:54

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2267 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2268 {
2269  /*
2270  * Initialize the last reply timestamp. That enables timeout processing
2271  * from hereon.
2272  */
2274  waiting_for_ping_response = false;
2275 
2276  /*
2277  * Loop until we reach the end of this timeline or the client requests to
2278  * stop streaming.
2279  */
2280  for (;;)
2281  {
2282  /* Clear any already-pending wakeups */
2284 
2286 
2287  /* Process any requests or signals received recently */
2288  if (ConfigReloadPending)
2289  {
2290  ConfigReloadPending = false;
2293  }
2294 
2295  /* Check for input from the client */
2297 
2298  /*
2299  * If we have received CopyDone from the client, sent CopyDone
2300  * ourselves, and the output buffer is empty, it's time to exit
2301  * streaming.
2302  */
2304  !pq_is_send_pending())
2305  break;
2306 
2307  /*
2308  * If we don't have any pending data in the output buffer, try to send
2309  * some more. If there is some, we don't bother to call send_data
2310  * again until we've flushed it ... but we'd better assume we are not
2311  * caught up.
2312  */
2313  if (!pq_is_send_pending())
2314  send_data();
2315  else
2316  WalSndCaughtUp = false;
2317 
2318  /* Try to flush pending output to the client */
2319  if (pq_flush_if_writable() != 0)
2320  WalSndShutdown();
2321 
2322  /* If nothing remains to be sent right now ... */
2324  {
2325  /*
2326  * If we're in catchup state, move to streaming. This is an
2327  * important state change for users to know about, since before
2328  * this point data loss might occur if the primary dies and we
2329  * need to failover to the standby. The state change is also
2330  * important for synchronous replication, since commits that
2331  * started to wait at that point might wait for some time.
2332  */
2334  {
2335  ereport(DEBUG1,
2336  (errmsg("\"%s\" has now caught up with upstream server",
2337  application_name)));
2339  }
2340 
2341  /*
2342  * When SIGUSR2 arrives, we send any outstanding logs up to the
2343  * shutdown checkpoint record (i.e., the latest record), wait for
2344  * them to be replicated to the standby, and exit. This may be a
2345  * normal termination at shutdown, or a promotion, the walsender
2346  * is not sure which.
2347  */
2348  if (got_SIGUSR2)
2349  WalSndDone(send_data);
2350  }
2351 
2352  /* Check for replication timeout. */
2354 
2355  /* Send keepalive if the time has come */
2357 
2358  /*
2359  * Block if we have unsent data. XXX For logical replication, let
2360  * WalSndWaitForWal() handle any other blocking; idle receivers need
2361  * its additional actions. For physical replication, also block if
2362  * caught up; its send_data does not block.
2363  */
2364  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2367  {
2368  long sleeptime;
2369  int wakeEvents;
2370 
2371  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2373 
2374  /*
2375  * Use fresh timestamp, not last_processing, to reduce the chance
2376  * of reaching wal_sender_timeout before sending a keepalive.
2377  */
2379 
2380  if (pq_is_send_pending())
2381  wakeEvents |= WL_SOCKET_WRITEABLE;
2382 
2383  /* Sleep until something happens or we time out */
2384  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2385  MyProcPort->sock, sleeptime,
2387  }
2388  }
2389 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2930
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3486
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:144
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
static void XLogSendLogical(void)
Definition: walsender.c:2850
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:563
int errmsg(const char *fmt,...)
Definition: elog.c:821
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1233 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1234 {
1235  /* can't have sync rep confused by sending the same LSN several times */
1236  if (!last_write)
1237  lsn = InvalidXLogRecPtr;
1238 
1239  resetStringInfo(ctx->out);
1240 
1241  pq_sendbyte(ctx->out, 'w');
1242  pq_sendint64(ctx->out, lsn); /* dataStart */
1243  pq_sendint64(ctx->out, lsn); /* walEnd */
1244 
1245  /*
1246  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1247  * reserve space here.
1248  */
1249  pq_sendint64(ctx->out, 0); /* sendtime */
1250 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 330 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3000 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

3001 {
3002  int i;
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockAcquire(&walsnd->mutex);
3009  if (walsnd->pid == 0)
3010  {
3011  SpinLockRelease(&walsnd->mutex);
3012  continue;
3013  }
3014  walsnd->needreload = true;
3015  SpinLockRelease(&walsnd->mutex);
3016  }
3017 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2471 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALOpenSegment::ws_segno, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

2473 {
2474  char path[MAXPGPATH];
2475 
2476  /*-------
2477  * When reading from a historic timeline, and there is a timeline switch
2478  * within this segment, read from the WAL segment belonging to the new
2479  * timeline.
2480  *
2481  * For example, imagine that this server is currently on timeline 5, and
2482  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2483  * 0/13002088. In pg_wal, we have these files:
2484  *
2485  * ...
2486  * 000000040000000000000012
2487  * 000000040000000000000013
2488  * 000000050000000000000013
2489  * 000000050000000000000014
2490  * ...
2491  *
2492  * In this situation, when requested to send the WAL from segment 0x13, on
2493  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2494  * recovery prefers files from newer timelines, so if the segment was
2495  * restored from the archive on this server, the file belonging to the old
2496  * timeline, 000000040000000000000013, might not exist. Their contents are
2497  * equal up to the switchpoint, because at a timeline switch, the used
2498  * portion of the old segment is copied to the new file. -------
2499  */
2500  *tli_p = sendTimeLine;
2502  {
2503  XLogSegNo endSegNo;
2504 
2506  if (state->seg.ws_segno == endSegNo)
2507  *tli_p = sendTimeLineNextTLI;
2508  }
2509 
2510  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2511  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2512  if (state->seg.ws_file >= 0)
2513  return;
2514 
2515  /*
2516  * If the file is not found, assume it's because the standby asked for a
2517  * too old WAL segment that has already been removed or recycled.
2518  */
2519  if (errno == ENOENT)
2520  {
2521  char xlogfname[MAXFNAMELEN];
2522  int save_errno = errno;
2523 
2524  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2525  errno = save_errno;
2526  ereport(ERROR,
2528  errmsg("requested WAL segment %s has already been removed",
2529  xlogfname)));
2530  }
2531  else
2532  ereport(ERROR,
2534  errmsg("could not open file \"%s\": %m",
2535  path)));
2536 }
int wal_segment_size
Definition: xlog.c:117
#define PG_BINARY
Definition: c.h:1213
WALOpenSegment seg
Definition: xlogreader.h:215
#define ERROR
Definition: elog.h:43
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogSegNo ws_segno
Definition: xlogreader.h:47
int errcode_for_file_access(void)
Definition: elog.c:633
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:144
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:986
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:821
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3213 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3214 {
3215  WalSnd *walsnd = MyWalSnd;
3216 
3218 
3219  if (walsnd->state == state)
3220  return;
3221 
3222  SpinLockAcquire(&walsnd->mutex);
3223  walsnd->state = state;
3224  SpinLockRelease(&walsnd->mutex);
3225 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:746
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3088 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3089 {
3090  bool found;
3091  int i;
3092 
3093  WalSndCtl = (WalSndCtlData *)
3094  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3095 
3096  if (!found)
3097  {
3098  /* First time through, so initialize */
3100 
3101  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107 
3108  SpinLockInit(&walsnd->mutex);
3109  }
3110  }
3111 }
Size WalSndShmemSize(void)
Definition: walsender.c:3076
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:950
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3076 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3077 {
3078  Size size = 0;
3079 
3080  size = offsetof(WalSndCtlData, walsnds);
3081  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3082 
3083  return size;
3084 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:474
#define offsetof(type, field)
Definition: c.h:669

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

262 {
264 
265  /* Create a per-walsender data structure in shared memory */
267 
268  /*
269  * We don't currently need any ResourceOwner in a walsender process, but
270  * if we did, we could call CreateAuxProcessResourceOwner here.
271  */
272 
273  /*
274  * Let postmaster know that we're a WAL sender. Once we've declared us as
275  * a WAL sender process, postmaster will let us outlive the bgwriter and
276  * kill us last in the shutdown sequence, so we get a chance to stream all
277  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278  * there's no going back, and we mustn't write any WAL records after this.
279  */
282 
283  /* Initialize empty timestamp buffer for lag tracking. */
285 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2393
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
bool RecoveryInProgress(void)
Definition: xlog.c:8076
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3057 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3058 {
3059  /* Set up signal handlers */
3061  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3062  pqsignal(SIGTERM, die); /* request shutdown */
3063  /* SIGQUIT handler was already set up by InitPostmasterChild */
3064  InitializeTimeouts(); /* establishes SIGALRM handler */
3067  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3068  * shutdown */
3069 
3070  /* Reset some signals that are accepted by postmaster but not here */
3072 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3045
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1349 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1350 {
1351  static TimestampTz sendTime = 0;
1353 
1354  /*
1355  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1356  * avoid flooding the lag tracker when we commit frequently.
1357  */
1358 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1359  if (!TimestampDifferenceExceeds(sendTime, now,
1361  return;
1362 
1363  LagTrackerWrite(lsn, now);
1364  sendTime = now;
1365 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1677
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3524
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1375 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1376 {
1377  int wakeEvents;
1378  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1379 
1380  /*
1381  * Fast path to avoid acquiring the spinlock in case we already know we
1382  * have enough WAL available. This is particularly interesting if we're
1383  * far behind.
1384  */
1385  if (RecentFlushPtr != InvalidXLogRecPtr &&
1386  loc <= RecentFlushPtr)
1387  return RecentFlushPtr;
1388 
1389  /* Get a more recent flush pointer. */
1390  if (!RecoveryInProgress())
1391  RecentFlushPtr = GetFlushRecPtr();
1392  else
1393  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1394 
1395  for (;;)
1396  {
1397  long sleeptime;
1398 
1399  /* Clear any already-pending wakeups */
1401 
1403 
1404  /* Process any requests or signals received recently */
1405  if (ConfigReloadPending)
1406  {
1407  ConfigReloadPending = false;
1410  }
1411 
1412  /* Check for input from the client */
1414 
1415  /*
1416  * If we're shutting down, trigger pending WAL to be written out,
1417  * otherwise we'd possibly end up waiting for WAL that never gets
1418  * written, because walwriter has shut down already.
1419  */
1420  if (got_STOPPING)
1422 
1423  /* Update our idea of the currently flushed position. */
1424  if (!RecoveryInProgress())
1425  RecentFlushPtr = GetFlushRecPtr();
1426  else
1427  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1428 
1429  /*
1430  * If postmaster asked us to stop, don't wait anymore.
1431  *
1432  * It's important to do this check after the recomputation of
1433  * RecentFlushPtr, so we can send all remaining data before shutting
1434  * down.
1435  */
1436  if (got_STOPPING)
1437  break;
1438 
1439  /*
1440  * We only send regular messages to the client for full decoded
1441  * transactions, but a synchronous replication and walsender shutdown
1442  * possibly are waiting for a later location. So, before sleeping, we
1443  * send a ping containing the flush location. If the receiver is
1444  * otherwise idle, this keepalive will trigger a reply. Processing the
1445  * reply will update these MyWalSnd locations.
1446  */
1447  if (MyWalSnd->flush < sentPtr &&
1448  MyWalSnd->write < sentPtr &&
1450  WalSndKeepalive(false);
1451 
1452  /* check whether we're done */
1453  if (loc <= RecentFlushPtr)
1454  break;
1455 
1456  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1457  WalSndCaughtUp = true;
1458 
1459  /*
1460  * Try to flush any pending output to the client.
1461  */
1462  if (pq_flush_if_writable() != 0)
1463  WalSndShutdown();
1464 
1465  /*
1466  * If we have received CopyDone from the client, sent CopyDone
1467  * ourselves, and the output buffer is empty, it's time to exit
1468  * streaming, so fail the current WAL fetch request.
1469  */
1471  !pq_is_send_pending())
1472  break;
1473 
1474  /* die if timeout was reached */
1476 
1477  /* Send keepalive if the time has come */
1479 
1480  /*
1481  * Sleep until something happens or we time out. Also wait for the
1482  * socket becoming writable, if there's still pending output.
1483  * Otherwise we might sit on sendable output data while waiting for
1484  * new WAL to be generated. (But if we have nothing to send, we don't
1485  * want to wake on socket-writable.)
1486  */
1488 
1489  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1491 
1492  if (pq_is_send_pending())
1493  wakeEvents |= WL_SOCKET_WRITEABLE;
1494 
1495  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1496  MyProcPort->sock, sleeptime,
1498  }
1499 
1500  /* reactivate latch so WalSndLoop knows to continue */
1501  SetLatch(MyLatch);
1502  return RecentFlushPtr;
1503 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8428
int sleeptime
Definition: pg_standby.c:41
bool RecoveryInProgress(void)
Definition: xlog.c:8076
void SetLatch(Latch *latch)
Definition: latch.c:505
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3463
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11500
bool XLogBackgroundFlush(void)
Definition: xlog.c:3036
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3486
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2190
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3175 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3176 {
3177  for (;;)
3178  {
3179  int i;
3180  bool all_stopped = true;
3181 
3182  for (i = 0; i < max_wal_senders; i++)
3183  {
3184  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3185 
3186  SpinLockAcquire(&walsnd->mutex);
3187 
3188  if (walsnd->pid == 0)
3189  {
3190  SpinLockRelease(&walsnd->mutex);
3191  continue;
3192  }
3193 
3194  if (walsnd->state != WALSNDSTATE_STOPPING)
3195  {
3196  all_stopped = false;
3197  SpinLockRelease(&walsnd->mutex);
3198  break;
3199  }
3200  SpinLockRelease(&walsnd->mutex);
3201  }
3202 
3203  /* safe to leave if confirmation is done for all WAL senders */
3204  if (all_stopped)
3205  return;
3206 
3207  pg_usleep(10000L); /* wait for 10 msec */
3208  }
3209 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3120 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3121 {
3122  int i;
3123 
3124  for (i = 0; i < max_wal_senders; i++)
3125  {
3126  Latch *latch;
3127  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3128 
3129  /*
3130  * Get latch pointer with spinlock held, for the unlikely case that
3131  * pointer reads aren't atomic (as they're 8 bytes).
3132  */
3133  SpinLockAcquire(&walsnd->mutex);
3134  latch = walsnd->latch;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (latch != NULL)
3138  SetLatch(latch);
3139  }
3140 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:505
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1260 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1262 {
1263  TimestampTz now;
1264 
1265  /*
1266  * Fill the send timestamp last, so that it is taken as late as possible.
1267  * This is somewhat ugly, but the protocol is set as it's already used for
1268  * several releases by streaming physical replication.
1269  */
1271  now = GetCurrentTimestamp();
1272  pq_sendint64(&tmpbuf, now);
1273  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1274  tmpbuf.data, sizeof(int64));
1275 
1276  /* output previously gathered data in a CopyData packet */
1277  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1278 
1280 
1281  /* Try to flush pending output to the client */
1282  if (pq_flush_if_writable() != 0)
1283  WalSndShutdown();
1284 
1285  /* Try taking fast path unless we get too close to walsender timeout. */
1287  wal_sender_timeout / 2) &&
1288  !pq_is_send_pending())
1289  {
1290  return;
1291  }
1292 
1293  /* If we have pending write here, go to slow path */
1294  for (;;)
1295  {
1296  int wakeEvents;
1297  long sleeptime;
1298 
1299  /* Check for input from the client */
1301 
1302  /* die if timeout was reached */
1304 
1305  /* Send keepalive if the time has come */
1307 
1308  if (!pq_is_send_pending())
1309  break;
1310 
1312 
1313  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1315 
1316  /* Sleep until something happens or we time out */
1317  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1318  MyProcPort->sock, sleeptime,
1320 
1321  /* Clear any already-pending wakeups */
1323 
1325 
1326  /* Process any requests or signals received recently */
1327  if (ConfigReloadPending)
1328  {
1329  ConfigReloadPending = false;
1332  }
1333 
1334  /* Try to flush pending output to the client */
1335  if (pq_flush_if_writable() != 0)
1336  WalSndShutdown();
1337  }
1338 
1339  /* reactivate latch so WalSndLoop knows to continue */
1340  SetLatch(MyLatch);
1341 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:123
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:505
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:412
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3486
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2190
static StringInfoData tmpbuf
Definition: walsender.c:159
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:70
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2850 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

2851 {
2852  XLogRecord *record;
2853  char *errm;
2854 
2855  /*
2856  * We'll use the current flush point to determine whether we've caught up.
2857  * This variable is static in order to cache it across calls. Caching is
2858  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2859  * spinlock.
2860  */
2861  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2862 
2863  /*
2864  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2865  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2866  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2867  * didn't wait - i.e. when we're shutting down.
2868  */
2869  WalSndCaughtUp = false;
2870 
2871  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2872 
2873  /* xlog record was invalid */
2874  if (errm != NULL)
2875  elog(ERROR, "%s", errm);
2876 
2877  if (record != NULL)
2878  {
2879  /*
2880  * Note the lack of any call to LagTrackerWrite() which is handled by
2881  * WalSndUpdateProgress which is called by output plugin through
2882  * logical decoding write api.
2883  */
2885 
2887  }
2888 
2889  /*
2890  * If first time through in this session, initialize flushPtr. Otherwise,
2891  * we only need to update flushPtr if EndRecPtr is past it.
2892  */
2893  if (flushPtr == InvalidXLogRecPtr)
2894  flushPtr = GetFlushRecPtr();
2895  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2896  flushPtr = GetFlushRecPtr();
2897 
2898  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2899  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2900  WalSndCaughtUp = true;
2901 
2902  /*
2903  * If we're caught up and have been requested to stop, have WalSndLoop()
2904  * terminate the connection in an orderly manner, after writing out all
2905  * the pending data.
2906  */
2908  got_SIGUSR2 = true;
2909 
2910  /* Update shared memory status */
2911  {
2912  WalSnd *walsnd = MyWalSnd;
2913 
2914  SpinLockAcquire(&walsnd->mutex);
2915  walsnd->sentPtr = sentPtr;
2916  SpinLockRelease(&walsnd->mutex);
2917  }
2918 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8428
slock_t mutex
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:268
#define ERROR
Definition: