PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 921 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

922 {
923  const char *snapshot_name = NULL;
924  char xloc[MAXFNAMELEN];
925  char *slot_name;
926  bool reserve_wal = false;
927  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
929  TupOutputState *tstate;
930  TupleDesc tupdesc;
931  Datum values[4];
932  bool nulls[4];
933 
935 
936  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
937 
938  /* setup state for WalSndSegmentOpen */
939  sendTimeLineIsHistoric = false;
941 
942  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
943  {
944  ReplicationSlotCreate(cmd->slotname, false,
946  }
947  else
948  {
950 
951  /*
952  * Initially create persistent slot as ephemeral - that allows us to
953  * nicely handle errors during initialization because it'll get
954  * dropped if this transaction fails. We'll make it persistent at the
955  * end. Temporary slots can be created as temporary from beginning as
956  * they get dropped on error as well.
957  */
958  ReplicationSlotCreate(cmd->slotname, true,
960  }
961 
962  if (cmd->kind == REPLICATION_KIND_LOGICAL)
963  {
965  bool need_full_snapshot = false;
966 
967  /*
968  * Do options check early so that we can bail before calling the
969  * DecodingContextFindStartpoint which can take long time.
970  */
971  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
972  {
973  if (IsTransactionBlock())
974  ereport(ERROR,
975  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
976  (errmsg("%s must not be called inside a transaction",
977  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
978 
979  need_full_snapshot = true;
980  }
981  else if (snapshot_action == CRS_USE_SNAPSHOT)
982  {
983  if (!IsTransactionBlock())
984  ereport(ERROR,
985  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
986  (errmsg("%s must be called inside a transaction",
987  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
988 
990  ereport(ERROR,
991  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
993  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
994 
995  if (FirstSnapshotSet)
996  ereport(ERROR,
997  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
998  (errmsg("%s must be called before any query",
999  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1000 
1001  if (IsSubTransaction())
1002  ereport(ERROR,
1003  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1004  (errmsg("%s must not be called in a subtransaction",
1005  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1006 
1007  need_full_snapshot = true;
1008  }
1009 
1010  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1012  XL_ROUTINE(.page_read = logical_read_xlog_page,
1013  .segment_open = WalSndSegmentOpen,
1014  .segment_close = wal_segment_close),
1017 
1018  /*
1019  * Signal that we don't need the timeout mechanism. We're just
1020  * creating the replication slot and don't yet accept feedback
1021  * messages or send keepalives. As we possibly need to wait for
1022  * further WAL the walsender would otherwise possibly be killed too
1023  * soon.
1024  */
1026 
1027  /* build initial snapshot, might take a while */
1029 
1030  /*
1031  * Export or use the snapshot if we've been asked to do so.
1032  *
1033  * NB. We will convert the snapbuild.c kind of snapshot to normal
1034  * snapshot when doing this.
1035  */
1036  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1037  {
1038  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1039  }
1040  else if (snapshot_action == CRS_USE_SNAPSHOT)
1041  {
1042  Snapshot snap;
1043 
1046  }
1047 
1048  /* don't need the decoding context anymore */
1049  FreeDecodingContext(ctx);
1050 
1051  if (!cmd->temporary)
1053  }
1054  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1055  {
1057 
1059 
1060  /* Write this slot to disk if it's a permanent one. */
1061  if (!cmd->temporary)
1063  }
1064 
1065  snprintf(xloc, sizeof(xloc), "%X/%X",
1068 
1070  MemSet(nulls, false, sizeof(nulls));
1071 
1072  /*----------
1073  * Need a tuple descriptor representing four columns:
1074  * - first field: the slot name
1075  * - second field: LSN at which we became consistent
1076  * - third field: exported snapshot's name
1077  * - fourth field: output plugin
1078  *----------
1079  */
1080  tupdesc = CreateTemplateTupleDesc(4);
1081  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1082  TEXTOID, -1, 0);
1083  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1084  TEXTOID, -1, 0);
1085  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1086  TEXTOID, -1, 0);
1087  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1088  TEXTOID, -1, 0);
1089 
1090  /* prepare for projection of tuples */
1091  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1092 
1093  /* slot_name */
1094  slot_name = NameStr(MyReplicationSlot->data.name);
1095  values[0] = CStringGetTextDatum(slot_name);
1096 
1097  /* consistent wal location */
1098  values[1] = CStringGetTextDatum(xloc);
1099 
1100  /* snapshot name, or NULL if none */
1101  if (snapshot_name != NULL)
1102  values[2] = CStringGetTextDatum(snapshot_name);
1103  else
1104  nulls[2] = true;
1105 
1106  /* plugin, or NULL if none */
1107  if (cmd->plugin != NULL)
1108  values[3] = CStringGetTextDatum(cmd->plugin);
1109  else
1110  nulls[3] = true;
1111 
1112  /* send it to dest */
1113  do_tup_output(tstate, values, nulls);
1114  end_tup_output(tstate);
1115 
1117 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:811
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:868
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
#define MemSet(start, val, len)
Definition: c.h:996
void ReplicationSlotSave(void)
Definition: slot.c:719
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2222
ReplicationSlotPersistentData data
Definition: slot.h:143
XLogRecPtr confirmed_flush
Definition: slot.h:91
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4685
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:632
void ReplicationSlotReserveWal(void)
Definition: slot.c:1078
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:562
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:45
bool FirstSnapshotSet
Definition: snapmgr.c:149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:754
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1260
unsigned int uint32
Definition: c.h:429
void ReplicationSlotRelease(void)
Definition: slot.c:484
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1233
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2459
TimeLineID ThisTimeLineID
Definition: xlog.c:193
#define ereport(elevel,...)
Definition: elog.h:155
struct SnapBuild * snapshot_builder
Definition: logical.h:43
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define Assert(condition)
Definition: c.h:792
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:605
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4758
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:540
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:915
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:669
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:814
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:215
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:318
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1123 of file walsender.c.

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1124 {
1125  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1126 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:578

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1512 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1513 {
1514  int parse_rc;
1515  Node *cmd_node;
1516  const char *cmdtag;
1517  MemoryContext cmd_context;
1518  MemoryContext old_context;
1519 
1520  /*
1521  * If WAL sender has been told that shutdown is getting close, switch its
1522  * status accordingly to handle the next replication commands correctly.
1523  */
1524  if (got_STOPPING)
1526 
1527  /*
1528  * Throw error if in stopping mode. We need prevent commands that could
1529  * generate WAL while the shutdown checkpoint is being written. To be
1530  * safe, we just prohibit all new commands.
1531  */
1533  ereport(ERROR,
1534  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1535 
1536  /*
1537  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1538  * command arrives. Clean up the old stuff if there's anything.
1539  */
1541 
1543 
1544  /*
1545  * Parse the command.
1546  */
1548  "Replication command context",
1550  old_context = MemoryContextSwitchTo(cmd_context);
1551 
1552  replication_scanner_init(cmd_string);
1553  parse_rc = replication_yyparse();
1554  if (parse_rc != 0)
1555  ereport(ERROR,
1556  (errcode(ERRCODE_SYNTAX_ERROR),
1557  errmsg_internal("replication command parser returned %d",
1558  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * If it's a SQL command, just clean up our mess and return false; the
1565  * caller will take care of executing it.
1566  */
1567  if (IsA(cmd_node, SQLCmd))
1568  {
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  MemoryContextSwitchTo(old_context);
1574  MemoryContextDelete(cmd_context);
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578  }
1579 
1580  /*
1581  * Report query to various monitoring facilities. For this purpose, we
1582  * report replication commands just like SQL commands.
1583  */
1584  debug_query_string = cmd_string;
1585 
1587 
1588  /*
1589  * Log replication command if log_replication_commands is enabled. Even
1590  * when it's disabled, log the command with DEBUG1 level for backward
1591  * compatibility.
1592  */
1594  (errmsg("received replication command: %s", cmd_string)));
1595 
1596  /*
1597  * Disallow replication commands in aborted transaction blocks.
1598  */
1600  ereport(ERROR,
1601  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1602  errmsg("current transaction is aborted, "
1603  "commands ignored until end of transaction block")));
1604 
1606 
1607  /*
1608  * Allocate buffers that will be used for each outgoing and incoming
1609  * message. We do this just once per command to reduce palloc overhead.
1610  */
1614 
1615  switch (cmd_node->type)
1616  {
1617  case T_IdentifySystemCmd:
1618  cmdtag = "IDENTIFY_SYSTEM";
1619  set_ps_display(cmdtag);
1620  IdentifySystem();
1621  EndReplicationCommand(cmdtag);
1622  break;
1623 
1624  case T_BaseBackupCmd:
1625  cmdtag = "BASE_BACKUP";
1626  set_ps_display(cmdtag);
1627  PreventInTransactionBlock(true, cmdtag);
1628  SendBaseBackup((BaseBackupCmd *) cmd_node);
1629  EndReplicationCommand(cmdtag);
1630  break;
1631 
1633  cmdtag = "CREATE_REPLICATION_SLOT";
1634  set_ps_display(cmdtag);
1636  EndReplicationCommand(cmdtag);
1637  break;
1638 
1640  cmdtag = "DROP_REPLICATION_SLOT";
1641  set_ps_display(cmdtag);
1643  EndReplicationCommand(cmdtag);
1644  break;
1645 
1646  case T_StartReplicationCmd:
1647  {
1648  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1649 
1650  cmdtag = "START_REPLICATION";
1651  set_ps_display(cmdtag);
1652  PreventInTransactionBlock(true, cmdtag);
1653 
1654  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1655  StartReplication(cmd);
1656  else
1658 
1659  /* dupe, but necessary per libpqrcv_endstreaming */
1660  EndReplicationCommand(cmdtag);
1661 
1662  Assert(xlogreader != NULL);
1663  break;
1664  }
1665 
1666  case T_TimeLineHistoryCmd:
1667  cmdtag = "TIMELINE_HISTORY";
1668  set_ps_display(cmdtag);
1669  PreventInTransactionBlock(true, cmdtag);
1671  EndReplicationCommand(cmdtag);
1672  break;
1673 
1674  case T_VariableShowStmt:
1675  {
1677  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1678 
1679  cmdtag = "SHOW";
1680  set_ps_display(cmdtag);
1681 
1682  /* syscache access needs a transaction environment */
1684  GetPGVariable(n->name, dest);
1686  EndReplicationCommand(cmdtag);
1687  }
1688  break;
1689 
1690  default:
1691  elog(ERROR, "unrecognized replication command node tag: %u",
1692  cmd_node->type);
1693  }
1694 
1695  /* done */
1696  MemoryContextSwitchTo(old_context);
1697  MemoryContextDelete(cmd_context);
1698 
1699  /*
1700  * We need not update ps display or pg_stat_activity, because PostgresMain
1701  * will reset those to "idle". But we must reset debug_query_string to
1702  * ensure it doesn't become a dangling pointer.
1703  */
1704  debug_query_string = NULL;
1705 
1706  return true;
1707 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
void CommitTransactionCommand(void)
Definition: xact.c:2948
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:528
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:704
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9130
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:45
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:530
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3381
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1133
Oid MyDatabaseId
Definition: globals.c:86
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
#define Assert(condition)
Definition: c.h:792
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
void StartTransactionCommand(void)
Definition: xact.c:2847
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static void IdentifySystem(void)
Definition: walsender.c:375
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:693
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2958 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2959 {
2960  XLogRecPtr replayPtr;
2961  TimeLineID replayTLI;
2962  XLogRecPtr receivePtr;
2964  XLogRecPtr result;
2965 
2966  /*
2967  * We can safely send what's already been replayed. Also, if walreceiver
2968  * is streaming WAL from the same timeline, we can send anything that it
2969  * has streamed, but hasn't been replayed yet.
2970  */
2971 
2972  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2973  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2974 
2975  ThisTimeLineID = replayTLI;
2976 
2977  result = replayPtr;
2978  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2979  result = receivePtr;
2980 
2981  return result;
2982 }
uint32 TimeLineID
Definition: xlogdefs.h:52
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11638
static TimeLineID receiveTLI
Definition: xlog.c:215
TimeLineID ThisTimeLineID
Definition: xlog.c:193
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3011 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3012 {
3014 
3015  /*
3016  * If replication has not yet started, die like with SIGTERM. If
3017  * replication is active, only set a flag and wake up the main loop. It
3018  * will send any outstanding WAL, wait for it to be replicated to the
3019  * standby, and then exit gracefully.
3020  */
3021  if (!replication_active)
3022  kill(MyProcPid, SIGTERM);
3023  else
3024  got_STOPPING = true;
3025 }
int MyProcPid
Definition: globals.c:41
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:792

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 375 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

376 {
377  char sysid[32];
378  char xloc[MAXFNAMELEN];
379  XLogRecPtr logptr;
380  char *dbname = NULL;
382  TupOutputState *tstate;
383  TupleDesc tupdesc;
384  Datum values[4];
385  bool nulls[4];
386 
387  /*
388  * Reply with a result set with one row, four columns. First col is system
389  * ID, second is timeline ID, third is current xlog location and the
390  * fourth contains the database name if we are connected to one.
391  */
392 
393  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
395 
398  {
399  /* this also updates ThisTimeLineID */
400  logptr = GetStandbyFlushRecPtr();
401  }
402  else
403  logptr = GetFlushRecPtr();
404 
405  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
406 
407  if (MyDatabaseId != InvalidOid)
408  {
410 
411  /* syscache access needs a transaction env. */
413  /* make dbname live outside TX context */
417  /* CommitTransactionCommand switches to TopMemoryContext */
419  }
420 
422  MemSet(nulls, false, sizeof(nulls));
423 
424  /* need a tuple descriptor representing four columns */
425  tupdesc = CreateTemplateTupleDesc(4);
426  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
427  TEXTOID, -1, 0);
428  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
429  INT4OID, -1, 0);
430  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
431  TEXTOID, -1, 0);
432  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
433  TEXTOID, -1, 0);
434 
435  /* prepare for projection of tuples */
436  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
437 
438  /* column 1: system identifier */
439  values[0] = CStringGetTextDatum(sysid);
440 
441  /* column 2: timeline */
442  values[1] = Int32GetDatum(ThisTimeLineID);
443 
444  /* column 3: wal location */
445  values[2] = CStringGetTextDatum(xloc);
446 
447  /* column 4: database name, or NULL if none */
448  if (dbname)
449  values[3] = CStringGetTextDatum(dbname);
450  else
451  nulls[3] = true;
452 
453  /* send it to dest */
454  do_tup_output(tstate, values, nulls);
455 
456  end_tup_output(tstate);
457 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
void CommitTransactionCommand(void)
Definition: xact.c:2948
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:996
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8500
bool RecoveryInProgress(void)
Definition: xlog.c:8148
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
unsigned int uint32
Definition: c.h:429
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:86
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:193
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2847
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:165
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4941
#define Int32GetDatum(X)
Definition: postgres.h:479
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2958
#define snprintf
Definition: port.h:215
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:472
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2381 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2382 {
2383  int i;
2384 
2385  /*
2386  * WalSndCtl should be set up already (we inherit this by fork() or
2387  * EXEC_BACKEND mechanism from the postmaster).
2388  */
2389  Assert(WalSndCtl != NULL);
2390  Assert(MyWalSnd == NULL);
2391 
2392  /*
2393  * Find a free walsender slot and reserve it. This must not fail due to
2394  * the prior check for free WAL senders in InitProcess().
2395  */
2396  for (i = 0; i < max_wal_senders; i++)
2397  {
2398  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2399 
2400  SpinLockAcquire(&walsnd->mutex);
2401 
2402  if (walsnd->pid != 0)
2403  {
2404  SpinLockRelease(&walsnd->mutex);
2405  continue;
2406  }
2407  else
2408  {
2409  /*
2410  * Found a free slot. Reserve it for us.
2411  */
2412  walsnd->pid = MyProcPid;
2413  walsnd->state = WALSNDSTATE_STARTUP;
2414  walsnd->sentPtr = InvalidXLogRecPtr;
2415  walsnd->needreload = false;
2416  walsnd->write = InvalidXLogRecPtr;
2417  walsnd->flush = InvalidXLogRecPtr;
2418  walsnd->apply = InvalidXLogRecPtr;
2419  walsnd->writeLag = -1;
2420  walsnd->flushLag = -1;
2421  walsnd->applyLag = -1;
2422  walsnd->sync_standby_priority = 0;
2423  walsnd->latch = &MyProc->procLatch;
2424  walsnd->replyTime = 0;
2425  SpinLockRelease(&walsnd->mutex);
2426  /* don't need the lock anymore */
2427  MyWalSnd = (WalSnd *) walsnd;
2428 
2429  break;
2430  }
2431  }
2432 
2433  Assert(MyWalSnd != NULL);
2434 
2435  /* Arrange to clean up at walsender exit */
2437 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:41
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2441
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:792
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3577 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3578 {
3579  TimestampTz time = 0;
3580 
3581  /* Read all unread samples up to this LSN or end of buffer. */
3582  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3583  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3584  {
3585  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3586  lag_tracker->last_read[head] =
3588  lag_tracker->read_heads[head] =
3590  }
3591 
3592  /*
3593  * If the lag tracker is empty, that means the standby has processed
3594  * everything we've ever sent so we should now clear 'last_read'. If we
3595  * didn't do that, we'd risk using a stale and irrelevant sample for
3596  * interpolation at the beginning of the next burst of WAL after a period
3597  * of idleness.
3598  */
3600  lag_tracker->last_read[head].time = 0;
3601 
3602  if (time > now)
3603  {
3604  /* If the clock somehow went backwards, treat as not found. */
3605  return -1;
3606  }
3607  else if (time == 0)
3608  {
3609  /*
3610  * We didn't cross a time. If there is a future sample that we
3611  * haven't reached yet, and we've already reached at least one sample,
3612  * let's interpolate the local flushed time. This is mainly useful
3613  * for reporting a completely stuck apply position as having
3614  * increasing lag, since otherwise we'd have to wait for it to
3615  * eventually start moving again and cross one of our samples before
3616  * we can show the lag increasing.
3617  */
3619  {
3620  /* There are no future samples, so we can't interpolate. */
3621  return -1;
3622  }
3623  else if (lag_tracker->last_read[head].time != 0)
3624  {
3625  /* We can interpolate between last_read and the next sample. */
3626  double fraction;
3627  WalTimeSample prev = lag_tracker->last_read[head];
3629 
3630  if (lsn < prev.lsn)
3631  {
3632  /*
3633  * Reported LSNs shouldn't normally go backwards, but it's
3634  * possible when there is a timeline change. Treat as not
3635  * found.
3636  */
3637  return -1;
3638  }
3639 
3640  Assert(prev.lsn < next.lsn);
3641 
3642  if (prev.time > next.time)
3643  {
3644  /* If the clock somehow went backwards, treat as not found. */
3645  return -1;
3646  }
3647 
3648  /* See how far we are between the previous and next samples. */
3649  fraction =
3650  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3651 
3652  /* Scale the local flush time proportionally. */
3653  time = (TimestampTz)
3654  ((double) prev.time + (next.time - prev.time) * fraction);
3655  }
3656  else
3657  {
3658  /*
3659  * We have only a future sample, implying that we were entirely
3660  * caught up but and now there is a new burst of WAL and the
3661  * standby hasn't processed the first sample yet. Until the
3662  * standby reaches the future sample the best we can do is report
3663  * the hypothetical lag if that sample were to be replayed now.
3664  */
3665  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3666  }
3667  }
3668 
3669  /* Return the elapsed time since local flush time in microseconds. */
3670  Assert(time != 0);
3671  return now - time;
3672 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:219
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:792
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3512 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3513 {
3514  bool buffer_full;
3515  int new_write_head;
3516  int i;
3517 
3518  if (!am_walsender)
3519  return;
3520 
3521  /*
3522  * If the lsn hasn't advanced since last time, then do nothing. This way
3523  * we only record a new sample when new WAL has been written.
3524  */
3525  if (lag_tracker->last_lsn == lsn)
3526  return;
3527  lag_tracker->last_lsn = lsn;
3528 
3529  /*
3530  * If advancing the write head of the circular buffer would crash into any
3531  * of the read heads, then the buffer is full. In other words, the
3532  * slowest reader (presumably apply) is the one that controls the release
3533  * of space.
3534  */
3535  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3536  buffer_full = false;
3537  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3538  {
3539  if (new_write_head == lag_tracker->read_heads[i])
3540  buffer_full = true;
3541  }
3542 
3543  /*
3544  * If the buffer is full, for now we just rewind by one slot and overwrite
3545  * the last sample, as a simple (if somewhat uneven) way to lower the
3546  * sampling rate. There may be better adaptive compaction algorithms.
3547  */
3548  if (buffer_full)
3549  {
3550  new_write_head = lag_tracker->write_head;
3551  if (lag_tracker->write_head > 0)
3553  else
3555  }
3556 
3557  /* Store a sample at the current write head position. */
3559  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3560  lag_tracker->write_head = new_write_head;
3561 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 814 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

816 {
817  XLogRecPtr flushptr;
818  int count;
819  WALReadError errinfo;
820  XLogSegNo segno;
821 
822  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
824  sendTimeLine = state->currTLI;
826  sendTimeLineNextTLI = state->nextTLI;
827 
828  /* make sure we have enough WAL available */
829  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
830 
831  /* fail if not (implies we are going to shut down) */
832  if (flushptr < targetPagePtr + reqLen)
833  return -1;
834 
835  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
836  count = XLOG_BLCKSZ; /* more than one block available */
837  else
838  count = flushptr - targetPagePtr; /* part of the page available */
839 
840  /* now actually read the data, we know it's there */
841  if (!WALRead(state,
842  cur_page,
843  targetPagePtr,
844  XLOG_BLCKSZ,
845  state->seg.ws_tli, /* Pass the current TLI because only
846  * WalSndSegmentOpen controls whether new
847  * TLI is needed. */
848  &errinfo))
849  WALReadRaiseError(&errinfo);
850 
851  /*
852  * After reading into the buffer, check that what we read was valid. We do
853  * this after reading, because even though the segment was present when we
854  * opened it, it might get recycled or removed while we read it. The
855  * read() succeeds in that case, but the data we tried to read might
856  * already have been overwritten with new WAL records.
857  */
858  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
859  CheckXLogRemoved(segno, state->seg.ws_tli);
860 
861  return count;
862 }
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:957
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:688
WALOpenSegment seg
Definition: xlogreader.h:215
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3956
uint64 XLogSegNo
Definition: xlogdefs.h:41
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:238
TimeLineID nextTLI
Definition: xlogreader.h:244
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1375
TimeLineID ThisTimeLineID
Definition: xlog.c:193
TimeLineID currTLI
Definition: xlogreader.h:228
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1063
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3239 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3240 {
3241  Interval *result = palloc(sizeof(Interval));
3242 
3243  result->month = 0;
3244  result->day = 0;
3245  result->time = offset;
3246 
3247  return result;
3248 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:950

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 868 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

871 {
872  ListCell *lc;
873  bool snapshot_action_given = false;
874  bool reserve_wal_given = false;
875 
876  /* Parse options */
877  foreach(lc, cmd->options)
878  {
879  DefElem *defel = (DefElem *) lfirst(lc);
880 
881  if (strcmp(defel->defname, "export_snapshot") == 0)
882  {
883  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
884  ereport(ERROR,
885  (errcode(ERRCODE_SYNTAX_ERROR),
886  errmsg("conflicting or redundant options")));
887 
888  snapshot_action_given = true;
889  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
891  }
892  else if (strcmp(defel->defname, "use_snapshot") == 0)
893  {
894  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
895  ereport(ERROR,
896  (errcode(ERRCODE_SYNTAX_ERROR),
897  errmsg("conflicting or redundant options")));
898 
899  snapshot_action_given = true;
900  *snapshot_action = CRS_USE_SNAPSHOT;
901  }
902  else if (strcmp(defel->defname, "reserve_wal") == 0)
903  {
904  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
905  ereport(ERROR,
906  (errcode(ERRCODE_SYNTAX_ERROR),
907  errmsg("conflicting or redundant options")));
908 
909  reserve_wal_given = true;
910  *reserve_wal = true;
911  }
912  else
913  elog(ERROR, "unrecognized option: %s", defel->defname);
914  }
915 }
int errcode(int sqlerrcode)
Definition: elog.c:704
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:45
#define ereport(elevel,...)
Definition: elog.h:155
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define elog(elevel,...)
Definition: elog.h:228
char * defname
Definition: parsenodes.h:733

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3255 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3256 {
3257 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3258  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3259  TupleDesc tupdesc;
3260  Tuplestorestate *tupstore;
3261  MemoryContext per_query_ctx;
3262  MemoryContext oldcontext;
3263  SyncRepStandbyData *sync_standbys;
3264  int num_standbys;
3265  int i;
3266 
3267  /* check to see if caller supports us returning a tuplestore */
3268  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3269  ereport(ERROR,
3270  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3271  errmsg("set-valued function called in context that cannot accept a set")));
3272  if (!(rsinfo->allowedModes & SFRM_Materialize))
3273  ereport(ERROR,
3274  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3275  errmsg("materialize mode required, but it is not allowed in this context")));
3276 
3277  /* Build a tuple descriptor for our result type */
3278  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3279  elog(ERROR, "return type must be a row type");
3280 
3281  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3282  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3283 
3284  tupstore = tuplestore_begin_heap(true, false, work_mem);
3285  rsinfo->returnMode = SFRM_Materialize;
3286  rsinfo->setResult = tupstore;
3287  rsinfo->setDesc = tupdesc;
3288 
3289  MemoryContextSwitchTo(oldcontext);
3290 
3291  /*
3292  * Get the currently active synchronous standbys. This could be out of
3293  * date before we're done, but we'll use the data anyway.
3294  */
3295  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3296 
3297  for (i = 0; i < max_wal_senders; i++)
3298  {
3299  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3301  XLogRecPtr write;
3302  XLogRecPtr flush;
3303  XLogRecPtr apply;
3304  TimeOffset writeLag;
3305  TimeOffset flushLag;
3306  TimeOffset applyLag;
3307  int priority;
3308  int pid;
3310  TimestampTz replyTime;
3311  bool is_sync_standby;
3313  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3314  int j;
3315 
3316  /* Collect data from shared memory */
3317  SpinLockAcquire(&walsnd->mutex);
3318  if (walsnd->pid == 0)
3319  {
3320  SpinLockRelease(&walsnd->mutex);
3321  continue;
3322  }
3323  pid = walsnd->pid;
3324  sentPtr = walsnd->sentPtr;
3325  state = walsnd->state;
3326  write = walsnd->write;
3327  flush = walsnd->flush;
3328  apply = walsnd->apply;
3329  writeLag = walsnd->writeLag;
3330  flushLag = walsnd->flushLag;
3331  applyLag = walsnd->applyLag;
3332  priority = walsnd->sync_standby_priority;
3333  replyTime = walsnd->replyTime;
3334  SpinLockRelease(&walsnd->mutex);
3335 
3336  /*
3337  * Detect whether walsender is/was considered synchronous. We can
3338  * provide some protection against stale data by checking the PID
3339  * along with walsnd_index.
3340  */
3341  is_sync_standby = false;
3342  for (j = 0; j < num_standbys; j++)
3343  {
3344  if (sync_standbys[j].walsnd_index == i &&
3345  sync_standbys[j].pid == pid)
3346  {
3347  is_sync_standby = true;
3348  break;
3349  }
3350  }
3351 
3352  memset(nulls, 0, sizeof(nulls));
3353  values[0] = Int32GetDatum(pid);
3354 
3355  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3356  {
3357  /*
3358  * Only superusers and members of pg_read_all_stats can see
3359  * details. Other users only get the pid value to know it's a
3360  * walsender, but no details.
3361  */
3362  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3363  }
3364  else
3365  {
3366  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3367 
3368  if (XLogRecPtrIsInvalid(sentPtr))
3369  nulls[2] = true;
3370  values[2] = LSNGetDatum(sentPtr);
3371 
3372  if (XLogRecPtrIsInvalid(write))
3373  nulls[3] = true;
3374  values[3] = LSNGetDatum(write);
3375 
3376  if (XLogRecPtrIsInvalid(flush))
3377  nulls[4] = true;
3378  values[4] = LSNGetDatum(flush);
3379 
3380  if (XLogRecPtrIsInvalid(apply))
3381  nulls[5] = true;
3382  values[5] = LSNGetDatum(apply);
3383 
3384  /*
3385  * Treat a standby such as a pg_basebackup background process
3386  * which always returns an invalid flush location, as an
3387  * asynchronous standby.
3388  */
3389  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3390 
3391  if (writeLag < 0)
3392  nulls[6] = true;
3393  else
3394  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3395 
3396  if (flushLag < 0)
3397  nulls[7] = true;
3398  else
3399  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3400 
3401  if (applyLag < 0)
3402  nulls[8] = true;
3403  else
3404  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3405 
3406  values[9] = Int32GetDatum(priority);
3407 
3408  /*
3409  * More easily understood version of standby state. This is purely
3410  * informational.
3411  *
3412  * In quorum-based sync replication, the role of each standby
3413  * listed in synchronous_standby_names can be changing very
3414  * frequently. Any standbys considered as "sync" at one moment can
3415  * be switched to "potential" ones at the next moment. So, it's
3416  * basically useless to report "sync" or "potential" as their sync
3417  * states. We report just "quorum" for them.
3418  */
3419  if (priority == 0)
3420  values[10] = CStringGetTextDatum("async");
3421  else if (is_sync_standby)
3423  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3424  else
3425  values[10] = CStringGetTextDatum("potential");
3426 
3427  if (replyTime == 0)
3428  nulls[11] = true;
3429  else
3430  values[11] = TimestampTzGetDatum(replyTime);
3431  }
3432 
3433  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3434  }
3435 
3436  /* clean up and return the tuplestore */
3437  tuplestore_donestoring(tupstore);
3438 
3439  return (Datum) 0;
3440 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:476
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:704
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:996
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:45
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3220
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:122
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:155
int allowedModes
Definition: execnodes.h:304
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4919
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:306
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:298
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:232
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:309
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:726
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:302
#define Int32GetDatum(X)
Definition: postgres.h:479
TupleDesc setDesc
Definition: execnodes.h:310
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define elog(elevel,...)
Definition: elog.h:228
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:82
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3239
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1841 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1842 {
1843  bool changed = false;
1845 
1846  Assert(lsn != InvalidXLogRecPtr);
1847  SpinLockAcquire(&slot->mutex);
1848  if (slot->data.restart_lsn != lsn)
1849  {
1850  changed = true;
1851  slot->data.restart_lsn = lsn;
1852  }
1853  SpinLockRelease(&slot->mutex);
1854 
1855  if (changed)
1856  {
1859  }
1860 
1861  /*
1862  * One could argue that the slot should be saved to disk now, but that'd
1863  * be energy wasted - the worst lost information can do here is give us
1864  * wrong information in a statistics view - we'll just potentially be more
1865  * conservative in removing files.
1866  */
1867 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:143
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:826
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:792
XLogRecPtr restart_lsn
Definition: slot.h:80
slock_t mutex
Definition: slot.h:116
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1978 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1979 {
1980  bool changed = false;
1982 
1983  SpinLockAcquire(&slot->mutex);
1985 
1986  /*
1987  * For physical replication we don't need the interlock provided by xmin
1988  * and effective_xmin since the consequences of a missed increase are
1989  * limited to query cancellations, so set both at once.
1990  */
1991  if (!TransactionIdIsNormal(slot->data.xmin) ||
1992  !TransactionIdIsNormal(feedbackXmin) ||
1993  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1994  {
1995  changed = true;
1996  slot->data.xmin = feedbackXmin;
1997  slot->effective_xmin = feedbackXmin;
1998  }
1999  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2000  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2001  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2002  {
2003  changed = true;
2004  slot->data.catalog_xmin = feedbackCatalogXmin;
2005  slot->effective_catalog_xmin = feedbackCatalogXmin;
2006  }
2007  SpinLockRelease(&slot->mutex);
2008 
2009  if (changed)
2010  {
2013  }
2014 }
PGPROC * MyProc
Definition: proc.c:68
ReplicationSlotPersistentData data
Definition: slot.h:143
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:139
TransactionId xmin
Definition: proc.h:138
TransactionId catalog_xmin
Definition: slot.h:77
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:69
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:140
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:116
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:776
void ReplicationSlotMarkDirty(void)
Definition: slot.c:737

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1714 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1715 {
1716  unsigned char firstchar;
1717  int r;
1718  bool received = false;
1719 
1721 
1722  /*
1723  * If we already received a CopyDone from the frontend, any subsequent
1724  * message is the beginning of a new command, and should be processed in
1725  * the main processing loop.
1726  */
1727  while (!streamingDoneReceiving)
1728  {
1729  pq_startmsgread();
1730  r = pq_getbyte_if_available(&firstchar);
1731  if (r < 0)
1732  {
1733  /* unexpected error or EOF */
1735  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1736  errmsg("unexpected EOF on standby connection")));
1737  proc_exit(0);
1738  }
1739  if (r == 0)
1740  {
1741  /* no data available without blocking */
1742  pq_endmsgread();
1743  break;
1744  }
1745 
1746  /* Read the message contents */
1748  if (pq_getmessage(&reply_message, 0))
1749  {
1751  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1752  errmsg("unexpected EOF on standby connection")));
1753  proc_exit(0);
1754  }
1755 
1756  /* Handle the very limited subset of commands expected in this phase */
1757  switch (firstchar)
1758  {
1759  /*
1760  * 'd' means a standby reply wrapped in a CopyData packet.
1761  */
1762  case 'd':
1764  received = true;
1765  break;
1766 
1767  /*
1768  * CopyDone means the standby requested to finish streaming.
1769  * Reply with CopyDone, if we had not sent that already.
1770  */
1771  case 'c':
1772  if (!streamingDoneSending)
1773  {
1774  pq_putmessage_noblock('c', NULL, 0);
1775  streamingDoneSending = true;
1776  }
1777 
1778  streamingDoneReceiving = true;
1779  received = true;
1780  break;
1781 
1782  /*
1783  * 'X' means that the standby is closing down the socket.
1784  */
1785  case 'X':
1786  proc_exit(0);
1787 
1788  default:
1789  ereport(FATAL,
1790  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1791  errmsg("invalid standby message type \"%c\"",
1792  firstchar)));
1793  }
1794  }
1795 
1796  /*
1797  * Save the last reply timestamp if we've received at least one reply.
1798  */
1799  if (received)
1800  {
1802  waiting_for_ping_response = false;
1803  }
1804 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1810
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:704
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1209
#define FATAL
Definition: elog.h:54
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1039
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1271
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1233
#define ereport(elevel,...)
Definition: elog.h:155
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2058 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

2059 {
2060  TransactionId feedbackXmin;
2061  uint32 feedbackEpoch;
2062  TransactionId feedbackCatalogXmin;
2063  uint32 feedbackCatalogEpoch;
2064  TimestampTz replyTime;
2065 
2066  /*
2067  * Decipher the reply message. The caller already consumed the msgtype
2068  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2069  * of this message.
2070  */
2071  replyTime = pq_getmsgint64(&reply_message);
2072  feedbackXmin = pq_getmsgint(&reply_message, 4);
2073  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2074  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2075  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2076 
2078  {
2079  char *replyTimeStr;
2080 
2081  /* Copy because timestamptz_to_str returns a static buffer */
2082  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2083 
2084  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2085  feedbackXmin,
2086  feedbackEpoch,
2087  feedbackCatalogXmin,
2088  feedbackCatalogEpoch,
2089  replyTimeStr);
2090 
2091  pfree(replyTimeStr);
2092  }
2093 
2094  /*
2095  * Update shared state for this WalSender process based on reply data from
2096  * standby.
2097  */
2098  {
2099  WalSnd *walsnd = MyWalSnd;
2100 
2101  SpinLockAcquire(&walsnd->mutex);
2102  walsnd->replyTime = replyTime;
2103  SpinLockRelease(&walsnd->mutex);
2104  }
2105 
2106  /*
2107  * Unset WalSender's xmins if the feedback message values are invalid.
2108  * This happens when the downstream turned hot_standby_feedback off.
2109  */
2110  if (!TransactionIdIsNormal(feedbackXmin)
2111  && !TransactionIdIsNormal(feedbackCatalogXmin))
2112  {
2114  if (MyReplicationSlot != NULL)
2115  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2116  return;
2117  }
2118 
2119  /*
2120  * Check that the provided xmin/epoch are sane, that is, not in the future
2121  * and not so far back as to be already wrapped around. Ignore if not.
2122  */
2123  if (TransactionIdIsNormal(feedbackXmin) &&
2124  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2125  return;
2126 
2127  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2128  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2129  return;
2130 
2131  /*
2132  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2133  * the xmin will be taken into account by GetSnapshotData() /
2134  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2135  * thereby prevent the generation of cleanup conflicts on the standby
2136  * server.
2137  *
2138  * There is a small window for a race condition here: although we just
2139  * checked that feedbackXmin precedes nextXid, the nextXid could have
2140  * gotten advanced between our fetching it and applying the xmin below,
2141  * perhaps far enough to make feedbackXmin wrap around. In that case the
2142  * xmin we set here would be "in the future" and have no effect. No point
2143  * in worrying about this since it's too late to save the desired data
2144  * anyway. Assuming that the standby sends us an increasing sequence of
2145  * xmins, this could only happen during the first reply cycle, else our
2146  * own xmin would prevent nextXid from advancing so far.
2147  *
2148  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2149  * is assumed atomic, and there's no real need to prevent concurrent
2150  * horizon determinations. (If we're moving our xmin forward, this is
2151  * obviously safe, and if we're moving it backwards, well, the data is at
2152  * risk already since a VACUUM could already have determined the horizon.)
2153  *
2154  * If we're using a replication slot we reserve the xmin via that,
2155  * otherwise via the walsender's PGPROC entry. We can only track the
2156  * catalog xmin separately when using a slot, so we store the least of the
2157  * two provided when not using a slot.
2158  *
2159  * XXX: It might make sense to generalize the ephemeral slot concept and
2160  * always use the slot mechanism to handle the feedback xmin.
2161  */
2162  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2163  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2164  else
2165  {
2166  if (TransactionIdIsNormal(feedbackCatalogXmin)
2167  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2168  MyProc->xmin = feedbackCatalogXmin;
2169  else
2170  MyProc->xmin = feedbackXmin;
2171  }
2172 }
uint32 TransactionId
Definition: c.h:575
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
slock_t mutex
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2027
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1057
TransactionId xmin
Definition: proc.h:138
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:429
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1978
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1810 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1811 {
1812  char msgtype;
1813 
1814  /*
1815  * Check message type from the first byte.
1816  */
1817  msgtype = pq_getmsgbyte(&reply_message);
1818 
1819  switch (msgtype)
1820  {
1821  case 'r':
1823  break;
1824 
1825  case 'h':
1827  break;
1828 
1829  default:
1831  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1832  errmsg("unexpected message type \"%c\"", msgtype)));
1833  proc_exit(0);
1834  }
1835 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:704
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:155
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1873
int errmsg(const char *fmt,...)
Definition: elog.c:915
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2058

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1873 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1874 {
1875  XLogRecPtr writePtr,
1876  flushPtr,
1877  applyPtr;
1878  bool replyRequested;
1879  TimeOffset writeLag,
1880  flushLag,
1881  applyLag;
1882  bool clearLagTimes;
1883  TimestampTz now;
1884  TimestampTz replyTime;
1885 
1886  static bool fullyAppliedLastTime = false;
1887 
1888  /* the caller already consumed the msgtype byte */
1889  writePtr = pq_getmsgint64(&reply_message);
1890  flushPtr = pq_getmsgint64(&reply_message);
1891  applyPtr = pq_getmsgint64(&reply_message);
1892  replyTime = pq_getmsgint64(&reply_message);
1893  replyRequested = pq_getmsgbyte(&reply_message);
1894 
1896  {
1897  char *replyTimeStr;
1898 
1899  /* Copy because timestamptz_to_str returns a static buffer */
1900  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1901 
1902  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1903  (uint32) (writePtr >> 32), (uint32) writePtr,
1904  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1905  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1906  replyRequested ? " (reply requested)" : "",
1907  replyTimeStr);
1908 
1909  pfree(replyTimeStr);
1910  }
1911 
1912  /* See if we can compute the round-trip lag for these positions. */
1913  now = GetCurrentTimestamp();
1914  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1915  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1916  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1917 
1918  /*
1919  * If the standby reports that it has fully replayed the WAL in two
1920  * consecutive reply messages, then the second such message must result
1921  * from wal_receiver_status_interval expiring on the standby. This is a
1922  * convenient time to forget the lag times measured when it last
1923  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1924  * until more WAL traffic arrives.
1925  */
1926  clearLagTimes = false;
1927  if (applyPtr == sentPtr)
1928  {
1929  if (fullyAppliedLastTime)
1930  clearLagTimes = true;
1931  fullyAppliedLastTime = true;
1932  }
1933  else
1934  fullyAppliedLastTime = false;
1935 
1936  /* Send a reply if the standby requested one. */
1937  if (replyRequested)
1938  WalSndKeepalive(false);
1939 
1940  /*
1941  * Update shared state for this WalSender process based on reply data from
1942  * standby.
1943  */
1944  {
1945  WalSnd *walsnd = MyWalSnd;
1946 
1947  SpinLockAcquire(&walsnd->mutex);
1948  walsnd->write = writePtr;
1949  walsnd->flush = flushPtr;
1950  walsnd->apply = applyPtr;
1951  if (writeLag != -1 || clearLagTimes)
1952  walsnd->writeLag = writeLag;
1953  if (flushLag != -1 || clearLagTimes)
1954  walsnd->flushLag = flushLag;
1955  if (applyLag != -1 || clearLagTimes)
1956  walsnd->applyLag = applyLag;
1957  walsnd->replyTime = replyTime;
1958  SpinLockRelease(&walsnd->mutex);
1959  }
1960 
1963 
1964  /*
1965  * Advance our local xmin horizon when the client confirmed a flush.
1966  */
1967  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1968  {
1971  else
1973  }
1974 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1187
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1057
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3451
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1841
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
unsigned int uint32
Definition: c.h:429
#define SlotIsLogical(slot)
Definition: slot.h:165
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3577
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1660
#define elog(elevel,...)
Definition: elog.h:228
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:441
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1772

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 464 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

465 {
467  char histfname[MAXFNAMELEN];
468  char path[MAXPGPATH];
469  int fd;
470  off_t histfilelen;
471  off_t bytesleft;
472  Size len;
473 
474  /*
475  * Reply with a result set with one row, and two columns. The first col is
476  * the name of the history file, 2nd is the contents.
477  */
478 
479  TLHistoryFileName(histfname, cmd->timeline);
480  TLHistoryFilePath(path, cmd->timeline);
481 
482  /* Send a RowDescription message */
483  pq_beginmessage(&buf, 'T');
484  pq_sendint16(&buf, 2); /* 2 fields */
485 
486  /* first field */
487  pq_sendstring(&buf, "filename"); /* col name */
488  pq_sendint32(&buf, 0); /* table oid */
489  pq_sendint16(&buf, 0); /* attnum */
490  pq_sendint32(&buf, TEXTOID); /* type oid */
491  pq_sendint16(&buf, -1); /* typlen */
492  pq_sendint32(&buf, 0); /* typmod */
493  pq_sendint16(&buf, 0); /* format code */
494 
495  /* second field */
496  pq_sendstring(&buf, "content"); /* col name */
497  pq_sendint32(&buf, 0); /* table oid */
498  pq_sendint16(&buf, 0); /* attnum */
499  pq_sendint32(&buf, TEXTOID); /* type oid */
500  pq_sendint16(&buf, -1); /* typlen */
501  pq_sendint32(&buf, 0); /* typmod */
502  pq_sendint16(&buf, 0); /* format code */
503  pq_endmessage(&buf);
504 
505  /* Send a DataRow message */
506  pq_beginmessage(&buf, 'D');
507  pq_sendint16(&buf, 2); /* # of columns */
508  len = strlen(histfname);
509  pq_sendint32(&buf, len); /* col1 len */
510  pq_sendbytes(&buf, histfname, len);
511 
512  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
513  if (fd < 0)
514  ereport(ERROR,
516  errmsg("could not open file \"%s\": %m", path)));
517 
518  /* Determine file length and send it to client */
519  histfilelen = lseek(fd, 0, SEEK_END);
520  if (histfilelen < 0)
521  ereport(ERROR,
523  errmsg("could not seek to end of file \"%s\": %m", path)));
524  if (lseek(fd, 0, SEEK_SET) != 0)
525  ereport(ERROR,
527  errmsg("could not seek to beginning of file \"%s\": %m", path)));
528 
529  pq_sendint32(&buf, histfilelen); /* col2 len */
530 
531  bytesleft = histfilelen;
532  while (bytesleft > 0)
533  {
534  PGAlignedBlock rbuf;
535  int nread;
536 
538  nread = read(fd, rbuf.data, sizeof(rbuf));
540  if (nread < 0)
541  ereport(ERROR,
543  errmsg("could not read file \"%s\": %m",
544  path)));
545  else if (nread == 0)
546  ereport(ERROR,
548  errmsg("could not read file \"%s\": read %d of %zu",
549  path, nread, (Size) bytesleft)));
550 
551  pq_sendbytes(&buf, rbuf.data, nread);
552  bytesleft -= nread;
553  }
554 
555  if (CloseTransientFile(fd) != 0)
556  ereport(ERROR,
558  errmsg("could not close file \"%s\": %m", path)));
559 
560  pq_endmessage(&buf);
561 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
int errcode(int sqlerrcode)
Definition: elog.c:704
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1259
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:97
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1129
#define ERROR
Definition: elog.h:45
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2404
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:727
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2581
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:155
size_t Size
Definition: c.h:528
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1488
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:915
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1133 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, SAB_Error, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1134 {
1136  QueryCompletion qc;
1137 
1138  /* make sure that our requirements are still fulfilled */
1140 
1142 
1144 
1146  ereport(ERROR,
1147  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1148  errmsg("cannot read from logical replication slot \"%s\"",
1149  cmd->slotname),
1150  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1151 
1152  /*
1153  * Force a disconnect, so that the decoding code doesn't need to care
1154  * about an eventual switch from running in recovery, to running in a
1155  * normal environment. Client code is expected to handle reconnects.
1156  */
1158  {
1159  ereport(LOG,
1160  (errmsg("terminating walsender process after promotion")));
1161  got_STOPPING = true;
1162  }
1163 
1164  /*
1165  * Create our decoding context, making it start at the previously ack'ed
1166  * position.
1167  *
1168  * Do this before sending a CopyBothResponse message, so that any errors
1169  * are reported early.
1170  */
1172  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1173  XL_ROUTINE(.page_read = logical_read_xlog_page,
1174  .segment_open = WalSndSegmentOpen,
1175  .segment_close = wal_segment_close),
1179 
1181 
1182  /* Send a CopyBothResponse message, and start streaming */
1183  pq_beginmessage(&buf, 'W');
1184  pq_sendbyte(&buf, 0);
1185  pq_sendint16(&buf, 0);
1186  pq_endmessage(&buf);
1187  pq_flush();
1188 
1189  /* Start reading WAL from the oldest required WAL. */
1192 
1193  /*
1194  * Report the location after which we'll send out further commits as the
1195  * current sentPtr.
1196  */
1198 
1199  /* Also update the sent position status in shared memory */
1203 
1204  replication_active = true;
1205 
1207 
1208  /* Main loop of walsender */
1210 
1213 
1214  replication_active = false;
1215  if (got_STOPPING)
1216  proc_exit(0);
1218 
1219  /* Get out of COPY mode (CommandComplete). */
1220  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1221  EndCommand(&qc, DestRemote, false);
1222 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:811
#define pq_flush()
Definition: libpq.h:39
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:704
ReplicationSlotPersistentData data
Definition: slot.h:143
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8148
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:91
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:45
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:470
static char * buf
Definition: pg_test_fsync.c:68
int errdetail(const char *fmt,...)
Definition: elog.c:1048
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:240
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1260
void ReplicationSlotRelease(void)
Definition: slot.c:484
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2253
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1233
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2459
#define ereport(elevel,...)
Definition: elog.h:155
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:792
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:605
static void XLogSendLogical(void)
Definition: walsender.c:2838
XLogRecPtr restart_lsn
Definition: slot.h:80
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:915
XLogReaderState * reader
Definition: logical.h:41
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:375
Definition: slot.h:42
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:814
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 570 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), SAB_Error, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

571 {
573  XLogRecPtr FlushPtr;
574 
575  if (ThisTimeLineID == 0)
576  ereport(ERROR,
577  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
578  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
579 
580  /* create xlogreader for physical replication */
581  xlogreader =
583  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
584  .segment_close = wal_segment_close),
585  NULL);
586 
587  if (!xlogreader)
588  ereport(ERROR,
589  (errcode(ERRCODE_OUT_OF_MEMORY),
590  errmsg("out of memory")));
591 
592  /*
593  * We assume here that we're logging enough information in the WAL for
594  * log-shipping, since this is checked in PostmasterMain().
595  *
596  * NOTE: wal_level can only change at shutdown, so in most cases it is
597  * difficult for there to be WAL data that we can still see that was
598  * written at wal_level='minimal'.
599  */
600 
601  if (cmd->slotname)
602  {
605  ereport(ERROR,
606  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
607  errmsg("cannot use a logical replication slot for physical replication")));
608 
609  /*
610  * We don't need to verify the slot's restart_lsn here; instead we
611  * rely on the caller requesting the starting point to use. If the
612  * WAL segment doesn't exist, we'll fail later.
613  */
614  }
615 
616  /*
617  * Select the timeline. If it was given explicitly by the client, use
618  * that. Otherwise use the timeline of the last replayed record, which is
619  * kept in ThisTimeLineID.
620  */
622  {
623  /* this also updates ThisTimeLineID */
624  FlushPtr = GetStandbyFlushRecPtr();
625  }
626  else
627  FlushPtr = GetFlushRecPtr();
628 
629  if (cmd->timeline != 0)
630  {
631  XLogRecPtr switchpoint;
632 
633  sendTimeLine = cmd->timeline;
635  {
636  sendTimeLineIsHistoric = false;
638  }
639  else
640  {
641  List *timeLineHistory;
642 
643  sendTimeLineIsHistoric = true;
644 
645  /*
646  * Check that the timeline the client requested exists, and the
647  * requested start location is on that timeline.
648  */
649  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
650  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
652  list_free_deep(timeLineHistory);
653 
654  /*
655  * Found the requested timeline in the history. Check that
656  * requested startpoint is on that timeline in our history.
657  *
658  * This is quite loose on purpose. We only check that we didn't
659  * fork off the requested timeline before the switchpoint. We
660  * don't check that we switched *to* it before the requested
661  * starting point. This is because the client can legitimately
662  * request to start replication from the beginning of the WAL
663  * segment that contains switchpoint, but on the new timeline, so
664  * that it doesn't end up with a partial segment. If you ask for
665  * too old a starting point, you'll get an error later when we
666  * fail to find the requested WAL segment in pg_wal.
667  *
668  * XXX: we could be more strict here and only allow a startpoint
669  * that's older than the switchpoint, if it's still in the same
670  * WAL segment.
671  */
672  if (!XLogRecPtrIsInvalid(switchpoint) &&
673  switchpoint < cmd->startpoint)
674  {
675  ereport(ERROR,
676  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
677  (uint32) (cmd->startpoint >> 32),
678  (uint32) (cmd->startpoint),
679  cmd->timeline),
680  errdetail("This server's history forked from timeline %u at %X/%X.",
681  cmd->timeline,
682  (uint32) (switchpoint >> 32),
683  (uint32) (switchpoint))));
684  }
685  sendTimeLineValidUpto = switchpoint;
686  }
687  }
688  else
689  {
692  sendTimeLineIsHistoric = false;
693  }
694 
696 
697  /* If there is nothing to stream, don't even enter COPY mode */
699  {
700  /*
701  * When we first start replication the standby will be behind the
702  * primary. For some applications, for example synchronous
703  * replication, it is important to have a clear state for this initial
704  * catchup mode, so we can trigger actions when we change streaming
705  * state later. We may stay in this state for a long time, which is
706  * exactly why we want to be able to monitor whether or not we are
707  * still here.
708  */
710 
711  /* Send a CopyBothResponse message, and start streaming */
712  pq_beginmessage(&buf, 'W');
713  pq_sendbyte(&buf, 0);
714  pq_sendint16(&buf, 0);
715  pq_endmessage(&buf);
716  pq_flush();
717 
718  /*
719  * Don't allow a request to stream from a future point in WAL that
720  * hasn't been flushed to disk in this server yet.
721  */
722  if (FlushPtr < cmd->startpoint)
723  {
724  ereport(ERROR,
725  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
726  (uint32) (cmd->startpoint >> 32),
727  (uint32) (cmd->startpoint),
728  (uint32) (FlushPtr >> 32),
729  (uint32) (FlushPtr))));
730  }
731 
732  /* Start streaming from the requested point */
733  sentPtr = cmd->startpoint;
734 
735  /* Initialize shared memory status, too */
739 
741 
742  /* Main loop of walsender */
743  replication_active = true;
744 
746 
747  replication_active = false;
748  if (got_STOPPING)
749  proc_exit(0);
751 
753  }
754 
755  if (cmd->slotname)
757 
758  /*
759  * Copy is finished now. Send a single-row result set indicating the next
760  * timeline.
761  */
763  {
764  char startpos_str[8 + 1 + 8 + 1];
766  TupOutputState *tstate;
767  TupleDesc tupdesc;
768  Datum values[2];
769  bool nulls[2];
770 
771  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
772  (uint32) (sendTimeLineValidUpto >> 32),
774 
776  MemSet(nulls, false, sizeof(nulls));
777 
778  /*
779  * Need a tuple descriptor representing two columns. int8 may seem
780  * like a surprising data type for this, but in theory int4 would not
781  * be wide enough for this, as TimeLineID is unsigned.
782  */
783  tupdesc = CreateTemplateTupleDesc(2);
784  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
785  INT8OID, -1, 0);
786  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
787  TEXTOID, -1, 0);
788 
789  /* prepare for projection of tuple */
790  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
791 
792  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
793  values[1] = CStringGetTextDatum(startpos_str);
794 
795  /* send it to dest */
796  do_tup_output(tstate, values, nulls);
797 
798  end_tup_output(tstate);
799  }
800 
801  /* Send CommandComplete message */
802  EndReplicationCommand("START_STREAMING");
803 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:85
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:811
#define pq_flush()
Definition: libpq.h:39
int wal_segment_size
Definition: xlog.c:118
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2537
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:704
#define MemSet(start, val, len)
Definition: c.h:996
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8500
void list_free_deep(List *list)
Definition: list.c:1390
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:45
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1048
unsigned int uint32
Definition: c.h:429
void ReplicationSlotRelease(void)
Definition: slot.c:484
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:75
#define SlotIsLogical(slot)
Definition: slot.h:165
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1700
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2253
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:367
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2459
TimeLineID ThisTimeLineID
Definition: xlog.c:193
#define ereport(elevel,...)
Definition: elog.h:155
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:792
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:915
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:375
Definition: slot.h:42
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2958
Definition: pg_list.h:50
#define snprintf
Definition: port.h:215
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2027 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2028 {
2029  FullTransactionId nextFullXid;
2030  TransactionId nextXid;
2031  uint32 nextEpoch;
2032 
2033  nextFullXid = ReadNextFullTransactionId();
2034  nextXid = XidFromFullTransactionId(nextFullXid);
2035  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2036 
2037  if (xid <= nextXid)
2038  {
2039  if (epoch != nextEpoch)
2040  return false;
2041  }
2042  else
2043  {
2044  if (epoch + 1 != nextEpoch)
2045  return false;
2046  }
2047 
2048  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2049  return false; /* epoch OK, but it's wrapped around */
2050 
2051  return true;
2052 }
uint32 TransactionId
Definition: c.h:575
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:429
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2226 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2227 {
2228  TimestampTz timeout;
2229 
2230  /* don't bail out if we're doing something that doesn't require timeouts */
2231  if (last_reply_timestamp <= 0)
2232  return;
2233 
2236 
2237  if (wal_sender_timeout > 0 && last_processing >= timeout)
2238  {
2239  /*
2240  * Since typically expiration of replication timeout means
2241  * communication problem, we don't send the error message to the
2242  * standby.
2243  */
2245  (errmsg("terminating walsender process due to replication timeout")));
2246 
2247  WalSndShutdown();
2248  }
2249 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg(const char *fmt,...)
Definition: elog.c:915
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2182 of file walsender.c.

References last_reply_timestamp, sleeptime, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2183 {
2184  long sleeptime = 10000; /* 10 s */
2185 
2187  {
2188  TimestampTz wakeup_time;
2189 
2190  /*
2191  * At the latest stop sleeping once wal_sender_timeout has been
2192  * reached.
2193  */
2196 
2197  /*
2198  * If no ping has been sent yet, wakeup when it's time to do so.
2199  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2200  * the timeout passed without a response.
2201  */
2204  wal_sender_timeout / 2);
2205 
2206  /* Compute relative time until wakeup. */
2207  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2208  }
2209 
2210  return sleeptime;
2211 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
int sleeptime
Definition: pg_standby.c:41
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1691

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2918 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2919 {
2920  XLogRecPtr replicatedPtr;
2921 
2922  /* ... let's just be real sure we're caught up ... */
2923  send_data();
2924 
2925  /*
2926  * To figure out whether all WAL has successfully been replicated, check
2927  * flush location if valid, write otherwise. Tools like pg_receivewal will
2928  * usually (unless in synchronous mode) return an invalid flush location.
2929  */
2930  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2932 
2933  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2934  !pq_is_send_pending())
2935  {
2936  QueryCompletion qc;
2937 
2938  /* Inform the standby that XLOG streaming is done */
2939  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2940  EndCommand(&qc, DestRemote, false);
2941  pq_flush();
2942 
2943  proc_exit(0);
2944  }
2946  WalSndKeepalive(true);
2947 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:39
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3451
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 295 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:811
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4703
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:484
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
void ReplicationSlotCleanup(void)
Definition: slot.c:540
void LWLockReleaseAll(void)
Definition: lwlock.c:1909

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3220 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3221 {
3222  switch (state)
3223  {
3224  case WALSNDSTATE_STARTUP:
3225  return "startup";
3226  case WALSNDSTATE_BACKUP:
3227  return "backup";
3228  case WALSNDSTATE_CATCHUP:
3229  return "catchup";
3230  case WALSNDSTATE_STREAMING:
3231  return "streaming";
3232  case WALSNDSTATE_STOPPING:
3233  return "stopping";
3234  }
3235  return "UNKNOWN";
3236 }
Definition: regguts.h:298

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3137 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3138 {
3139  int i;
3140 
3141  for (i = 0; i < max_wal_senders; i++)
3142  {
3143  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3144  pid_t pid;
3145 
3146  SpinLockAcquire(&walsnd->mutex);
3147  pid = walsnd->pid;
3148  SpinLockRelease(&walsnd->mutex);
3149 
3150  if (pid == 0)
3151  continue;
3152 
3154  }
3155 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:256
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3451 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, and waiting_for_ping_response.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3452 {
3453  elog(DEBUG2, "sending replication keepalive");
3454 
3455  /* construct the message... */
3457  pq_sendbyte(&output_message, 'k');
3460  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3461 
3462  /* ... and send it wrapped in CopyData */
3464 
3465  /* Set local flag */
3466  if (requestReply)
3468 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
#define elog(elevel,...)
Definition: elog.h:228
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3474 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3475 {
3476  TimestampTz ping_time;
3477 
3478  /*
3479  * Don't send keepalive messages if timeouts are globally disabled or
3480  * we're doing something not partaking in timeouts.
3481  */
3482  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3483  return;
3484 
3486  return;
3487 
3488  /*
3489  * If half of wal_sender_timeout has lapsed without receiving any reply
3490  * from the standby, send a keep-alive message to the standby requesting
3491  * an immediate reply.
3492  */
3494  wal_sender_timeout / 2);
3495  if (last_processing >= ping_time)
3496  {
3497  WalSndKeepalive(true);
3498 
3499  /* Try to flush pending output to the client */
3500  if (pq_flush_if_writable() != 0)
3501  WalSndShutdown();
3502  }
3503 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3451
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:40
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2441 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2442 {
2443  WalSnd *walsnd = MyWalSnd;
2444 
2445  Assert(walsnd != NULL);
2446 
2447  MyWalSnd = NULL;
2448 
2449  SpinLockAcquire(&walsnd->mutex);
2450  /* clear latch while holding the spinlock, so it can safely be read */
2451  walsnd->latch = NULL;
2452  /* Mark WalSnd struct as no longer being in use. */
2453  walsnd->pid = 0;
2454  SpinLockRelease(&walsnd->mutex);
2455 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:792

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3033 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3034 {
3035  int save_errno = errno;
3036 
3037  got_SIGUSR2 = true;
3038  SetLatch(MyLatch);
3039 
3040  errno = save_errno;
3041 }
void SetLatch(Latch *latch)
Definition: latch.c:505
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:55

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2253 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2254 {
2255  /*
2256  * Initialize the last reply timestamp. That enables timeout processing
2257  * from hereon.
2258  */
2260  waiting_for_ping_response = false;
2261 
2262  /*
2263  * Loop until we reach the end of this timeline or the client requests to
2264  * stop streaming.
2265  */
2266  for (;;)
2267  {
2268  /* Clear any already-pending wakeups */
2270 
2272 
2273  /* Process any requests or signals received recently */
2274  if (ConfigReloadPending)
2275  {
2276  ConfigReloadPending = false;
2279  }
2280 
2281  /* Check for input from the client */
2283 
2284  /*
2285  * If we have received CopyDone from the client, sent CopyDone
2286  * ourselves, and the output buffer is empty, it's time to exit
2287  * streaming.
2288  */
2290  !pq_is_send_pending())
2291  break;
2292 
2293  /*
2294  * If we don't have any pending data in the output buffer, try to send
2295  * some more. If there is some, we don't bother to call send_data
2296  * again until we've flushed it ... but we'd better assume we are not
2297  * caught up.
2298  */
2299  if (!pq_is_send_pending())
2300  send_data();
2301  else
2302  WalSndCaughtUp = false;
2303 
2304  /* Try to flush pending output to the client */
2305  if (pq_flush_if_writable() != 0)
2306  WalSndShutdown();
2307 
2308  /* If nothing remains to be sent right now ... */
2310  {
2311  /*
2312  * If we're in catchup state, move to streaming. This is an
2313  * important state change for users to know about, since before
2314  * this point data loss might occur if the primary dies and we
2315  * need to failover to the standby. The state change is also
2316  * important for synchronous replication, since commits that
2317  * started to wait at that point might wait for some time.
2318  */
2320  {
2321  ereport(DEBUG1,
2322  (errmsg("\"%s\" has now caught up with upstream server",
2323  application_name)));
2325  }
2326 
2327  /*
2328  * When SIGUSR2 arrives, we send any outstanding logs up to the
2329  * shutdown checkpoint record (i.e., the latest record), wait for
2330  * them to be replicated to the standby, and exit. This may be a
2331  * normal termination at shutdown, or a promotion, the walsender
2332  * is not sure which.
2333  */
2334  if (got_SIGUSR2)
2335  WalSndDone(send_data);
2336  }
2337 
2338  /* Check for replication timeout. */
2340 
2341  /* Send keepalive if the time has come */
2343 
2344  /*
2345  * Block if we have unsent data. XXX For logical replication, let
2346  * WalSndWaitForWal() handle any other blocking; idle receivers need
2347  * its additional actions. For physical replication, also block if
2348  * caught up; its send_data does not block.
2349  */
2350  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2353  {
2354  long sleeptime;
2355  int wakeEvents;
2356 
2357  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT;
2358 
2360  wakeEvents |= WL_SOCKET_READABLE;
2361 
2362  /*
2363  * Use fresh timestamp, not last_processing, to reduce the chance
2364  * of reaching wal_sender_timeout before sending a keepalive.
2365  */
2367 
2368  if (pq_is_send_pending())
2369  wakeEvents |= WL_SOCKET_WRITEABLE;
2370 
2371  /* Sleep until something happens or we time out */
2372  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2373  MyProcPort->sock, sleeptime,
2375  }
2376  }
2377 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define DEBUG1
Definition: elog.h:25
struct Port * MyProcPort
Definition: globals.c:44
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2918
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3474
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2226
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:155
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2182
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
static void XLogSendLogical(void)
Definition: walsender.c:2838
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:565
int errmsg(const char *fmt,...)
Definition: elog.c:915
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1233 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1234 {
1235  /* can't have sync rep confused by sending the same LSN several times */
1236  if (!last_write)
1237  lsn = InvalidXLogRecPtr;
1238 
1239  resetStringInfo(ctx->out);
1240 
1241  pq_sendbyte(ctx->out, 'w');
1242  pq_sendint64(ctx->out, lsn); /* dataStart */
1243  pq_sendint64(ctx->out, lsn); /* walEnd */
1244 
1245  /*
1246  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1247  * reserve space here.
1248  */
1249  pq_sendint64(ctx->out, 0); /* sendtime */
1250 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:70

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 330 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:144
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:727
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:482

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2988 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2989 {
2990  int i;
2991 
2992  for (i = 0; i < max_wal_senders; i++)
2993  {
2994  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2995 
2996  SpinLockAcquire(&walsnd->mutex);
2997  if (walsnd->pid == 0)
2998  {
2999  SpinLockRelease(&walsnd->mutex);
3000  continue;
3001  }
3002  walsnd->needreload = true;
3003  SpinLockRelease(&walsnd->mutex);
3004  }
3005 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2459 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

2461 {
2462  char path[MAXPGPATH];
2463 
2464  /*-------
2465  * When reading from a historic timeline, and there is a timeline switch
2466  * within this segment, read from the WAL segment belonging to the new
2467  * timeline.
2468  *
2469  * For example, imagine that this server is currently on timeline 5, and
2470  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2471  * 0/13002088. In pg_wal, we have these files:
2472  *
2473  * ...
2474  * 000000040000000000000012
2475  * 000000040000000000000013
2476  * 000000050000000000000013
2477  * 000000050000000000000014
2478  * ...
2479  *
2480  * In this situation, when requested to send the WAL from segment 0x13, on
2481  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2482  * recovery prefers files from newer timelines, so if the segment was
2483  * restored from the archive on this server, the file belonging to the old
2484  * timeline, 000000040000000000000013, might not exist. Their contents are
2485  * equal up to the switchpoint, because at a timeline switch, the used
2486  * portion of the old segment is copied to the new file. -------
2487  */
2488  *tli_p = sendTimeLine;
2490  {
2491  XLogSegNo endSegNo;
2492 
2494  if (nextSegNo == endSegNo)
2495  *tli_p = sendTimeLineNextTLI;
2496  }
2497 
2498  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2499  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2500  if (state->seg.ws_file >= 0)
2501  return;
2502 
2503  /*
2504  * If the file is not found, assume it's because the standby asked for a
2505  * too old WAL segment that has already been removed or recycled.
2506  */
2507  if (errno == ENOENT)
2508  {
2509  char xlogfname[MAXFNAMELEN];
2510  int save_errno = errno;
2511 
2512  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2513  errno = save_errno;
2514  ereport(ERROR,
2516  errmsg("requested WAL segment %s has already been removed",
2517  xlogfname)));
2518  }
2519  else
2520  ereport(ERROR,
2522  errmsg("could not open file \"%s\": %m",
2523  path)));
2524 }
int wal_segment_size
Definition: xlog.c:118
#define PG_BINARY
Definition: c.h:1259
WALOpenSegment seg
Definition: xlogreader.h:215
#define ERROR
Definition: elog.h:45
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:727
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:155
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1014
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:915
WALSegmentContext segcxt
Definition: xlogreader.h:214
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3201 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3202 {
3203  WalSnd *walsnd = MyWalSnd;
3204 
3206 
3207  if (walsnd->state == state)
3208  return;
3209 
3210  SpinLockAcquire(&walsnd->mutex);
3211  walsnd->state = state;
3212  SpinLockRelease(&walsnd->mutex);
3213 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:792
Definition: regguts.h:298

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3076 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3077 {
3078  bool found;
3079  int i;
3080 
3081  WalSndCtl = (WalSndCtlData *)
3082  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3083 
3084  if (!found)
3085  {
3086  /* First time through, so initialize */
3088 
3089  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3091 
3092  for (i = 0; i < max_wal_senders; i++)
3093  {
3094  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3095 
3096  SpinLockInit(&walsnd->mutex);
3097  }
3098  }
3099 }
Size WalSndShmemSize(void)
Definition: walsender.c:3064
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:996
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3064 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3065 {
3066  Size size = 0;
3067 
3068  size = offsetof(WalSndCtlData, walsnds);
3069  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3070 
3071  return size;
3072 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:528
#define offsetof(type, field)
Definition: c.h:715

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

262 {
264 
265  /* Create a per-walsender data structure in shared memory */
267 
268  /*
269  * We don't currently need any ResourceOwner in a walsender process, but
270  * if we did, we could call CreateAuxProcessResourceOwner here.
271  */
272 
273  /*
274  * Let postmaster know that we're a WAL sender. Once we've declared us as
275  * a WAL sender process, postmaster will let us outlive the bgwriter and
276  * kill us last in the shutdown sequence, so we get a chance to stream all
277  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278  * there's no going back, and we mustn't write any WAL records after this.
279  */
282 
283  /* Initialize empty timestamp buffer for lag tracking. */
285 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2381
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
bool RecoveryInProgress(void)
Definition: xlog.c:8148
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:44
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3045 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3046 {
3047  /* Set up signal handlers */
3049  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3050  pqsignal(SIGTERM, die); /* request shutdown */
3051  /* SIGQUIT handler was already set up by InitPostmasterChild */
3052  InitializeTimeouts(); /* establishes SIGALRM handler */
3055  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3056  * shutdown */
3057 
3058  /* Reset some signals that are accepted by postmaster but not here */
3060 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3033
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2891
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:652
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1349 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1350 {
1351  static TimestampTz sendTime = 0;
1353 
1354  /*
1355  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1356  * avoid flooding the lag tracker when we commit frequently.
1357  */
1358 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1359  if (!TimestampDifferenceExceeds(sendTime, now,
1361  return;
1362 
1363  LagTrackerWrite(lsn, now);
1364  sendTime = now;
1365 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1709
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3512
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1375 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyProcPort, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), sleeptime, Port::sock, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WaitLatchOrSocket(), WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WL_TIMEOUT, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1376 {
1377  int wakeEvents;
1378  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1379 
1380  /*
1381  * Fast path to avoid acquiring the spinlock in case we already know we
1382  * have enough WAL available. This is particularly interesting if we're
1383  * far behind.
1384  */
1385  if (RecentFlushPtr != InvalidXLogRecPtr &&
1386  loc <= RecentFlushPtr)
1387  return RecentFlushPtr;
1388 
1389  /* Get a more recent flush pointer. */
1390  if (!RecoveryInProgress())
1391  RecentFlushPtr = GetFlushRecPtr();
1392  else
1393  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1394 
1395  for (;;)
1396  {
1397  long sleeptime;
1398 
1399  /* Clear any already-pending wakeups */
1401 
1403 
1404  /* Process any requests or signals received recently */
1405  if (ConfigReloadPending)
1406  {
1407  ConfigReloadPending = false;
1410  }
1411 
1412  /* Check for input from the client */
1414 
1415  /*
1416  * If we're shutting down, trigger pending WAL to be written out,
1417  * otherwise we'd possibly end up waiting for WAL that never gets
1418  * written, because walwriter has shut down already.
1419  */
1420  if (got_STOPPING)
1422 
1423  /* Update our idea of the currently flushed position. */
1424  if (!RecoveryInProgress())
1425  RecentFlushPtr = GetFlushRecPtr();
1426  else
1427  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1428 
1429  /*
1430  * If postmaster asked us to stop, don't wait anymore.
1431  *
1432  * It's important to do this check after the recomputation of
1433  * RecentFlushPtr, so we can send all remaining data before shutting
1434  * down.
1435  */
1436  if (got_STOPPING)
1437  break;
1438 
1439  /*
1440  * We only send regular messages to the client for full decoded
1441  * transactions, but a synchronous replication and walsender shutdown
1442  * possibly are waiting for a later location. So, before sleeping, we
1443  * send a ping containing the flush location. If the receiver is
1444  * otherwise idle, this keepalive will trigger a reply. Processing the
1445  * reply will update these MyWalSnd locations.
1446  */
1447  if (MyWalSnd->flush < sentPtr &&
1448  MyWalSnd->write < sentPtr &&
1450  WalSndKeepalive(false);
1451 
1452  /* check whether we're done */
1453  if (loc <= RecentFlushPtr)
1454  break;
1455 
1456  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1457  WalSndCaughtUp = true;
1458 
1459  /*
1460  * Try to flush any pending output to the client.
1461  */
1462  if (pq_flush_if_writable() != 0)
1463  WalSndShutdown();
1464 
1465  /*
1466  * If we have received CopyDone from the client, sent CopyDone
1467  * ourselves, and the output buffer is empty, it's time to exit
1468  * streaming, so fail the current WAL fetch request.
1469  */
1471  !pq_is_send_pending())
1472  break;
1473 
1474  /* die if timeout was reached */
1476 
1477  /* Send keepalive if the time has come */
1479 
1480  /*
1481  * Sleep until something happens or we time out. Also wait for the
1482  * socket becoming writable, if there's still pending output.
1483  * Otherwise we might sit on sendable output data while waiting for
1484  * new WAL to be generated. (But if we have nothing to send, we don't
1485  * want to wake on socket-writable.)
1486  */
1488 
1489  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1491 
1492  if (pq_is_send_pending())
1493  wakeEvents |= WL_SOCKET_WRITEABLE;
1494 
1495  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1496  MyProcPort->sock, sleeptime,
1498  }
1499 
1500  /* reactivate latch so WalSndLoop knows to continue */
1501  SetLatch(MyLatch);
1502  return RecentFlushPtr;
1503 }
#define pq_is_send_pending()
Definition: libpq.h:41
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
struct Port * MyProcPort
Definition: globals.c:44
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:125
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8500
int sleeptime
Definition: pg_standby.c:41
bool RecoveryInProgress(void)
Definition: xlog.c:8148
void SetLatch(Latch *latch)
Definition: latch.c:505
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3451
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11638
bool XLogBackgroundFlush(void)
Definition: xlog.c:3049
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3474
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2226
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2182
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static bool waiting_for_ping_response
Definition: walsender.c:171
#define WL_LATCH_SET
Definition: latch.h:124
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3163 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3164 {
3165  for (;;)
3166  {
3167  int i;
3168  bool all_stopped = true;
3169 
3170  for (i = 0; i < max_wal_senders; i++)
3171  {
3172  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3173 
3174  SpinLockAcquire(&walsnd->mutex);
3175 
3176  if (walsnd->pid == 0)
3177  {
3178  SpinLockRelease(&walsnd->mutex);
3179  continue;
3180  }
3181 
3182  if (walsnd->state != WALSNDSTATE_STOPPING)
3183  {
3184  all_stopped = false;
3185  SpinLockRelease(&walsnd->mutex);
3186  break;
3187  }
3188  SpinLockRelease(&walsnd->mutex);
3189  }
3190 
3191  /* safe to leave if confirmation is done for all WAL senders */
3192  if (all_stopped)
3193  return;
3194 
3195  pg_usleep(10000L); /* wait for 10 msec */
3196  }
3197 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3108 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3109 {
3110  int i;
3111 
3112  for (i = 0; i < max_wal_senders; i++)
3113  {
3114  Latch *latch;
3115  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3116 
3117  /*
3118  * Get latch pointer with spinlock held, for the unlikely case that
3119  * pointer reads aren't atomic (as they're 8 bytes).
3120  */
3121  SpinLockAcquire(&walsnd->mutex);
3122  latch = walsnd->latch;
3123  SpinLockRelease(&walsnd->mutex);
3124 
3125  if (latch != NULL)
3126  SetLatch(latch);
3127  }
3128 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:505
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1260 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, MyProcPort, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), sleeptime, Port::sock, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, WaitLatchOrSocket(), wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and WL_TIMEOUT.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1262 {
1263  TimestampTz now;
1264 
1265  /*
1266  * Fill the send timestamp last, so that it is taken as late as possible.
1267  * This is somewhat ugly, but the protocol is set as it's already used for
1268  * several releases by streaming physical replication.
1269  */
1271  now = GetCurrentTimestamp();
1272  pq_sendint64(&tmpbuf, now);
1273  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1274  tmpbuf.data, sizeof(int64));
1275 
1276  /* output previously gathered data in a CopyData packet */
1277  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1278 
1280 
1281  /* Try to flush pending output to the client */
1282  if (pq_flush_if_writable() != 0)
1283  WalSndShutdown();
1284 
1285  /* Try taking fast path unless we get too close to walsender timeout. */
1287  wal_sender_timeout / 2) &&
1288  !pq_is_send_pending())
1289  {
1290  return;
1291  }
1292 
1293  /* If we have pending write here, go to slow path */
1294  for (;;)
1295  {
1296  int wakeEvents;
1297  long sleeptime;
1298 
1299  /* Check for input from the client */
1301 
1302  /* die if timeout was reached */
1304 
1305  /* Send keepalive if the time has come */
1307 
1308  if (!pq_is_send_pending())
1309  break;
1310 
1312 
1313  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1315 
1316  /* Sleep until something happens or we time out */
1317  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1318  MyProcPort->sock, sleeptime,
1320 
1321  /* Clear any already-pending wakeups */
1323 
1325 
1326  /* Process any requests or signals received recently */
1327  if (ConfigReloadPending)
1328  {
1329  ConfigReloadPending = false;
1332  }
1333 
1334  /* Try to flush pending output to the client */
1335  if (pq_flush_if_writable() != 0)
1336  WalSndShutdown();
1337  }
1338 
1339  /* reactivate latch so WalSndLoop knows to continue */
1340  SetLatch(MyLatch);
1341 }
#define pq_is_send_pending()
Definition: libpq.h:41
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
int wal_sender_timeout
Definition: walsender.c:123
struct Port * MyProcPort
Definition: globals.c:44
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:125
int sleeptime
Definition: pg_standby.c:41
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:505
pgsocket sock
Definition: libpq-be.h:122
void ResetLatch(Latch *latch)
Definition: latch.c:588
#define pq_flush_if_writable()
Definition: libpq.h:40
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:412
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3474
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2226
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2182
static StringInfoData tmpbuf
Definition: walsender.c:159
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
StringInfo out
Definition: logical.h:70
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:55
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
#define WL_LATCH_SET
Definition: latch.h:124
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1714
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2838 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

2839 {
2840  XLogRecord *record;
2841  char *errm;
2842 
2843  /*
2844  * We'll use the current flush point to determine whether we've caught up.
2845  * This variable is static in order to cache it across calls. Caching is
2846  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2847  * spinlock.
2848  */
2849  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2850 
2851  /*
2852  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2853  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2854  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2855  * didn't wait - i.e. when we're shutting down.
2856  */
2857  WalSndCaughtUp = false;
2858 
2859  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2860 
2861  /* xlog record was invalid */
2862  if (errm != NULL)
2863  elog(ERROR, "%s", errm);
2864 
2865  if (record != NULL)
2866  {
2867  /*
2868  * Note the lack of any call to LagTrackerWrite() which is handled by
2869  * WalSndUpdateProgress which is called by output plugin through
2870  * logical decoding write api.
2871  */
2873 
2875  }
2876 
2877  /*
2878  * If first time through in this session, initialize flushPtr. Otherwise,
2879  * we only need to update flushPtr if EndRecPtr is past it.
2880  */
2881  if (flushPtr == InvalidXLogRecPtr)
2882  flushPtr = GetFlushRecPtr();
2883  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2884  flushPtr = GetFlushRecPtr();
2885 
2886  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2887  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2888  WalSndCaughtUp = true;
2889 
2890  /*
2891  * If we're caught up and have been requested to stop, have WalSndLoop()
2892  * terminate the connection in an orderly manner, after writing out all
2893  * the pending data.
2894  */
2896  got_SIGUSR2 = true;
2897 
2898  /* Update shared memory status */
2899  {
2900  WalSnd *walsnd = MyWalSnd;
2901 
2902  SpinLockAcquire(&walsnd->mutex);
2903  walsnd->sentPtr = sentPtr;
2904  SpinLockRelease(&walsnd->mutex);
2905  }
2906 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8500
slock_t mutex
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:268
#define ERROR
Definition: elog.h:45
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:105
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: