PostgreSQL Source Code  git master
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/printtup.h"
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
static void IdentifySystem (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static bool logical_read_xlog_page (XLogReaderState *state)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static XLogRecPtr GetStandbyFlushRecPtr (void)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 0
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 207 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

Referenced by XLogSendPhysical().

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

Referenced by pg_stat_get_wal_senders().

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Referenced by WalSndUpdateProgress().

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 225 of file walsender.c.

Function Documentation

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 920 of file walsender.c.

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MemSet, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, sendTimeLine, sendTimeLineIsHistoric, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), CreateReplicationSlotCmd::two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, and XactIsoLevel.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

921 {
922  const char *snapshot_name = NULL;
923  char xloc[MAXFNAMELEN];
924  char *slot_name;
925  bool reserve_wal = false;
926  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
928  TupOutputState *tstate;
929  TupleDesc tupdesc;
930  Datum values[4];
931  bool nulls[4];
932 
934 
935  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
936 
937  /* setup state for WalSndSegmentOpen */
938  sendTimeLineIsHistoric = false;
940 
941  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
942  {
943  ReplicationSlotCreate(cmd->slotname, false,
945  false);
946  }
947  else
948  {
950 
951  /*
952  * Initially create persistent slot as ephemeral - that allows us to
953  * nicely handle errors during initialization because it'll get
954  * dropped if this transaction fails. We'll make it persistent at the
955  * end. Temporary slots can be created as temporary from beginning as
956  * they get dropped on error as well.
957  */
958  ReplicationSlotCreate(cmd->slotname, true,
960  cmd->two_phase);
961  }
962 
963  if (cmd->kind == REPLICATION_KIND_LOGICAL)
964  {
966  bool need_full_snapshot = false;
967 
968  /*
969  * Do options check early so that we can bail before calling the
970  * DecodingContextFindStartpoint which can take long time.
971  */
972  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
973  {
974  if (IsTransactionBlock())
975  ereport(ERROR,
976  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
977  (errmsg("%s must not be called inside a transaction",
978  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
979 
980  need_full_snapshot = true;
981  }
982  else if (snapshot_action == CRS_USE_SNAPSHOT)
983  {
984  if (!IsTransactionBlock())
985  ereport(ERROR,
986  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
987  (errmsg("%s must be called inside a transaction",
988  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
989 
991  ereport(ERROR,
992  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
993  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
994  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
995 
996  if (FirstSnapshotSet)
997  ereport(ERROR,
998  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
999  (errmsg("%s must be called before any query",
1000  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1001 
1002  if (IsSubTransaction())
1003  ereport(ERROR,
1004  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1005  (errmsg("%s must not be called in a subtransaction",
1006  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1007 
1008  need_full_snapshot = true;
1009  }
1010 
1011  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1017 
1018  /*
1019  * Signal that we don't need the timeout mechanism. We're just
1020  * creating the replication slot and don't yet accept feedback
1021  * messages or send keepalives. As we possibly need to wait for
1022  * further WAL the walsender would otherwise possibly be killed too
1023  * soon.
1024  */
1026 
1027  /* build initial snapshot, might take a while */
1029 
1030  /*
1031  * Export or use the snapshot if we've been asked to do so.
1032  *
1033  * NB. We will convert the snapbuild.c kind of snapshot to normal
1034  * snapshot when doing this.
1035  */
1036  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1037  {
1038  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1039  }
1040  else if (snapshot_action == CRS_USE_SNAPSHOT)
1041  {
1042  Snapshot snap;
1043 
1046  }
1047 
1048  /* don't need the decoding context anymore */
1049  FreeDecodingContext(ctx);
1050 
1051  if (!cmd->temporary)
1053  }
1054  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1055  {
1057 
1059 
1060  /* Write this slot to disk if it's a permanent one. */
1061  if (!cmd->temporary)
1063  }
1064 
1065  snprintf(xloc, sizeof(xloc), "%X/%X",
1067 
1069  MemSet(nulls, false, sizeof(nulls));
1070 
1071  /*----------
1072  * Need a tuple descriptor representing four columns:
1073  * - first field: the slot name
1074  * - second field: LSN at which we became consistent
1075  * - third field: exported snapshot's name
1076  * - fourth field: output plugin
1077  *----------
1078  */
1079  tupdesc = CreateTemplateTupleDesc(4);
1080  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1081  TEXTOID, -1, 0);
1082  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1083  TEXTOID, -1, 0);
1084  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1085  TEXTOID, -1, 0);
1086  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1087  TEXTOID, -1, 0);
1088 
1089  /* prepare for projection of tuples */
1090  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1091 
1092  /* slot_name */
1093  slot_name = NameStr(MyReplicationSlot->data.name);
1094  values[0] = CStringGetTextDatum(slot_name);
1095 
1096  /* consistent wal location */
1097  values[1] = CStringGetTextDatum(xloc);
1098 
1099  /* snapshot name, or NULL if none */
1100  if (snapshot_name != NULL)
1101  values[2] = CStringGetTextDatum(snapshot_name);
1102  else
1103  nulls[2] = true;
1104 
1105  /* plugin, or NULL if none */
1106  if (cmd->plugin != NULL)
1107  values[3] = CStringGetTextDatum(cmd->plugin);
1108  else
1109  nulls[3] = true;
1110 
1111  /* send it to dest */
1112  do_tup_output(tstate, values, nulls);
1113  end_tup_output(tstate);
1114 
1116 }
#define NIL
Definition: pg_list.h:65
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:867
PGPROC * MyProc
Definition: proc.c:68
#define XACT_REPEATABLE_READ
Definition: xact.h:38
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
#define MemSet(start, val, len)
Definition: c.h:1008
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:228
void ReplicationSlotSave(void)
Definition: slot.c:732
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2226
ReplicationSlotPersistentData data
Definition: slot.h:156
XLogRecPtr confirmed_flush
Definition: slot.h:92
ReplicationKind kind
Definition: replnodes.h:56
bool IsTransactionBlock(void)
Definition: xact.c:4683
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:613
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotReserveWal(void)
Definition: slot.c:1091
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:575
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:658
#define ERROR
Definition: elog.h:46
bool FirstSnapshotSet
Definition: snapmgr.c:149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:767
static bool logical_read_xlog_page(XLogReaderState *state)
Definition: walsender.c:807
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1258
void ReplicationSlotRelease(void)
Definition: slot.c:497
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1231
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define ereport(elevel,...)
Definition: elog.h:157
struct SnapBuild * snapshot_builder
Definition: logical.h:46
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define Assert(condition)
Definition: c.h:804
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:624
int XactIsoLevel
Definition: xact.c:75
bool IsSubTransaction(void)
Definition: xact.c:4756
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:521
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
CRSSnapshotAction
Definition: walsender.h:20
#define NameStr(name)
Definition: c.h:681
#define CStringGetTextDatum(s)
Definition: builtins.h:82
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:320
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:216
int16 AttrNumber
Definition: attnum.h:21
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1342

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1122 of file walsender.c.

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

1123 {
1124  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1125 }
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:591

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1502 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1503 {
1504  int parse_rc;
1505  Node *cmd_node;
1506  const char *cmdtag;
1507  MemoryContext cmd_context;
1508  MemoryContext old_context;
1509 
1510  /*
1511  * If WAL sender has been told that shutdown is getting close, switch its
1512  * status accordingly to handle the next replication commands correctly.
1513  */
1514  if (got_STOPPING)
1516 
1517  /*
1518  * Throw error if in stopping mode. We need prevent commands that could
1519  * generate WAL while the shutdown checkpoint is being written. To be
1520  * safe, we just prohibit all new commands.
1521  */
1523  ereport(ERROR,
1524  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1525 
1526  /*
1527  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1528  * command arrives. Clean up the old stuff if there's anything.
1529  */
1531 
1533 
1534  /*
1535  * Parse the command.
1536  */
1538  "Replication command context",
1540  old_context = MemoryContextSwitchTo(cmd_context);
1541 
1542  replication_scanner_init(cmd_string);
1543  parse_rc = replication_yyparse();
1544  if (parse_rc != 0)
1545  ereport(ERROR,
1546  (errcode(ERRCODE_SYNTAX_ERROR),
1547  errmsg_internal("replication command parser returned %d",
1548  parse_rc)));
1550 
1551  cmd_node = replication_parse_result;
1552 
1553  /*
1554  * If it's a SQL command, just clean up our mess and return false; the
1555  * caller will take care of executing it.
1556  */
1557  if (IsA(cmd_node, SQLCmd))
1558  {
1559  if (MyDatabaseId == InvalidOid)
1560  ereport(ERROR,
1561  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1562 
1563  MemoryContextSwitchTo(old_context);
1564  MemoryContextDelete(cmd_context);
1565 
1566  /* Tell the caller that this wasn't a WalSender command. */
1567  return false;
1568  }
1569 
1570  /*
1571  * Report query to various monitoring facilities. For this purpose, we
1572  * report replication commands just like SQL commands.
1573  */
1574  debug_query_string = cmd_string;
1575 
1577 
1578  /*
1579  * Log replication command if log_replication_commands is enabled. Even
1580  * when it's disabled, log the command with DEBUG1 level for backward
1581  * compatibility.
1582  */
1584  (errmsg("received replication command: %s", cmd_string)));
1585 
1586  /*
1587  * Disallow replication commands in aborted transaction blocks.
1588  */
1590  ereport(ERROR,
1591  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1592  errmsg("current transaction is aborted, "
1593  "commands ignored until end of transaction block")));
1594 
1596 
1597  /*
1598  * Allocate buffers that will be used for each outgoing and incoming
1599  * message. We do this just once per command to reduce palloc overhead.
1600  */
1604 
1605  switch (cmd_node->type)
1606  {
1607  case T_IdentifySystemCmd:
1608  cmdtag = "IDENTIFY_SYSTEM";
1609  set_ps_display(cmdtag);
1610  IdentifySystem();
1611  EndReplicationCommand(cmdtag);
1612  break;
1613 
1614  case T_BaseBackupCmd:
1615  cmdtag = "BASE_BACKUP";
1616  set_ps_display(cmdtag);
1617  PreventInTransactionBlock(true, cmdtag);
1618  SendBaseBackup((BaseBackupCmd *) cmd_node);
1619  EndReplicationCommand(cmdtag);
1620  break;
1621 
1623  cmdtag = "CREATE_REPLICATION_SLOT";
1624  set_ps_display(cmdtag);
1626  EndReplicationCommand(cmdtag);
1627  break;
1628 
1630  cmdtag = "DROP_REPLICATION_SLOT";
1631  set_ps_display(cmdtag);
1633  EndReplicationCommand(cmdtag);
1634  break;
1635 
1636  case T_StartReplicationCmd:
1637  {
1638  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1639 
1640  cmdtag = "START_REPLICATION";
1641  set_ps_display(cmdtag);
1642  PreventInTransactionBlock(true, cmdtag);
1643 
1644  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1645  StartReplication(cmd);
1646  else
1648 
1649  /* dupe, but necessary per libpqrcv_endstreaming */
1650  EndReplicationCommand(cmdtag);
1651 
1652  Assert(xlogreader != NULL);
1653  break;
1654  }
1655 
1656  case T_TimeLineHistoryCmd:
1657  cmdtag = "TIMELINE_HISTORY";
1658  set_ps_display(cmdtag);
1659  PreventInTransactionBlock(true, cmdtag);
1661  EndReplicationCommand(cmdtag);
1662  break;
1663 
1664  case T_VariableShowStmt:
1665  {
1667  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1668 
1669  cmdtag = "SHOW";
1670  set_ps_display(cmdtag);
1671 
1672  /* syscache access needs a transaction environment */
1674  GetPGVariable(n->name, dest);
1676  EndReplicationCommand(cmdtag);
1677  }
1678  break;
1679 
1680  default:
1681  elog(ERROR, "unrecognized replication command node tag: %u",
1682  cmd_node->type);
1683  }
1684 
1685  /* done */
1686  MemoryContextSwitchTo(old_context);
1687  MemoryContextDelete(cmd_context);
1688 
1689  /*
1690  * We need not update ps display or pg_stat_activity, because PostgresMain
1691  * will reset those to "idle". But we must reset debug_query_string to
1692  * ensure it doesn't become a dangling pointer.
1693  */
1694  debug_query_string = NULL;
1695 
1696  return true;
1697 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1122
void CommitTransactionCommand(void)
Definition: xact.c:2939
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:539
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9333
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:83
#define ERROR
Definition: elog.h:46
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
WalSndState state
NodeTag type
Definition: nodes.h:541
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3379
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1132
Oid MyDatabaseId
Definition: globals.c:88
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3210
void StartTransactionCommand(void)
Definition: xact.c:2838
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:920
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define elog(elevel,...)
Definition: elog.h:232
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
static void IdentifySystem(void)
Definition: walsender.c:376
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:674
void replication_scanner_init(const char *query_string)

◆ GetStandbyFlushRecPtr()

static XLogRecPtr GetStandbyFlushRecPtr ( void  )
static

Definition at line 2951 of file walsender.c.

References GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSendPhysical().

2952 {
2953  XLogRecPtr replayPtr;
2954  TimeLineID replayTLI;
2955  XLogRecPtr receivePtr;
2957  XLogRecPtr result;
2958 
2959  /*
2960  * We can safely send what's already been replayed. Also, if walreceiver
2961  * is streaming WAL from the same timeline, we can send anything that it
2962  * has streamed, but hasn't been replayed yet.
2963  */
2964 
2965  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2966  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2967 
2968  ThisTimeLineID = replayTLI;
2969 
2970  result = replayPtr;
2971  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2972  result = receivePtr;
2973 
2974  return result;
2975 }
uint32 TimeLineID
Definition: xlogdefs.h:59
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
static TimeLineID receiveTLI
Definition: xlog.c:218
TimeLineID ThisTimeLineID
Definition: xlog.c:196
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3004 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3005 {
3007 
3008  /*
3009  * If replication has not yet started, die like with SIGTERM. If
3010  * replication is active, only set a flag and wake up the main loop. It
3011  * will send any outstanding WAL, wait for it to be replicated to the
3012  * standby, and then exit gracefully.
3013  */
3014  if (!replication_active)
3015  kill(MyProcPid, SIGTERM);
3016  else
3017  got_STOPPING = true;
3018 }
int MyProcPid
Definition: globals.c:43
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:804

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 376 of file walsender.c.

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int32GetDatum, InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextSwitchTo(), MemSet, MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), ThisTimeLineID, TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

377 {
378  char sysid[32];
379  char xloc[MAXFNAMELEN];
380  XLogRecPtr logptr;
381  char *dbname = NULL;
383  TupOutputState *tstate;
384  TupleDesc tupdesc;
385  Datum values[4];
386  bool nulls[4];
387 
388  /*
389  * Reply with a result set with one row, four columns. First col is system
390  * ID, second is timeline ID, third is current xlog location and the
391  * fourth contains the database name if we are connected to one.
392  */
393 
394  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
396 
399  {
400  /* this also updates ThisTimeLineID */
401  logptr = GetStandbyFlushRecPtr();
402  }
403  else
404  logptr = GetFlushRecPtr();
405 
406  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
407 
408  if (MyDatabaseId != InvalidOid)
409  {
411 
412  /* syscache access needs a transaction env. */
414  /* make dbname live outside TX context */
418  /* CommitTransactionCommand switches to TopMemoryContext */
420  }
421 
423  MemSet(nulls, false, sizeof(nulls));
424 
425  /* need a tuple descriptor representing four columns */
426  tupdesc = CreateTemplateTupleDesc(4);
427  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428  TEXTOID, -1, 0);
429  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430  INT4OID, -1, 0);
431  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432  TEXTOID, -1, 0);
433  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434  TEXTOID, -1, 0);
435 
436  /* prepare for projection of tuples */
437  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439  /* column 1: system identifier */
440  values[0] = CStringGetTextDatum(sysid);
441 
442  /* column 2: timeline */
443  values[1] = Int32GetDatum(ThisTimeLineID);
444 
445  /* column 3: wal location */
446  values[2] = CStringGetTextDatum(xloc);
447 
448  /* column 4: database name, or NULL if none */
449  if (dbname)
450  values[3] = CStringGetTextDatum(dbname);
451  else
452  nulls[3] = true;
453 
454  /* send it to dest */
455  do_tup_output(tstate, values, nulls);
456 
457  end_tup_output(tstate);
458 }
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void CommitTransactionCommand(void)
Definition: xact.c:2939
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
struct cursor * cur
Definition: ecpg.c:28
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
bool RecoveryInProgress(void)
Definition: xlog.c:8237
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:658
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
#define MAXFNAMELEN
uintptr_t Datum
Definition: postgres.h:411
Oid MyDatabaseId
Definition: globals.c:88
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:196
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void StartTransactionCommand(void)
Definition: xact.c:2838
char * dbname
Definition: streamutil.c:51
static Datum values[MAXATTR]
Definition: bootstrap.c:166
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4965
#define Int32GetDatum(X)
Definition: postgres.h:523
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2951
#define snprintf
Definition: port.h:216
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:484
bool am_cascading_walsender
Definition: walsender.c:116

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2369 of file walsender.c.

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidXLogRecPtr, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProc, MyProcPid, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, PGPROC::procLatch, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

2370 {
2371  int i;
2372 
2373  /*
2374  * WalSndCtl should be set up already (we inherit this by fork() or
2375  * EXEC_BACKEND mechanism from the postmaster).
2376  */
2377  Assert(WalSndCtl != NULL);
2378  Assert(MyWalSnd == NULL);
2379 
2380  /*
2381  * Find a free walsender slot and reserve it. This must not fail due to
2382  * the prior check for free WAL senders in InitProcess().
2383  */
2384  for (i = 0; i < max_wal_senders; i++)
2385  {
2386  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2387 
2388  SpinLockAcquire(&walsnd->mutex);
2389 
2390  if (walsnd->pid != 0)
2391  {
2392  SpinLockRelease(&walsnd->mutex);
2393  continue;
2394  }
2395  else
2396  {
2397  /*
2398  * Found a free slot. Reserve it for us.
2399  */
2400  walsnd->pid = MyProcPid;
2401  walsnd->state = WALSNDSTATE_STARTUP;
2402  walsnd->sentPtr = InvalidXLogRecPtr;
2403  walsnd->needreload = false;
2404  walsnd->write = InvalidXLogRecPtr;
2405  walsnd->flush = InvalidXLogRecPtr;
2406  walsnd->apply = InvalidXLogRecPtr;
2407  walsnd->writeLag = -1;
2408  walsnd->flushLag = -1;
2409  walsnd->applyLag = -1;
2410  walsnd->sync_standby_priority = 0;
2411  walsnd->latch = &MyProc->procLatch;
2412  walsnd->replyTime = 0;
2413  SpinLockRelease(&walsnd->mutex);
2414  /* don't need the lock anymore */
2415  MyWalSnd = (WalSnd *) walsnd;
2416 
2417  break;
2418  }
2419  }
2420 
2421  Assert(MyWalSnd != NULL);
2422 
2423  /* Arrange to clean up at walsender exit */
2425 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
int MyProcPid
Definition: globals.c:43
PGPROC * MyProc
Definition: proc.c:68
TimeOffset flushLag
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2429
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
Latch procLatch
Definition: proc.h:130
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
Latch * latch
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
TimeOffset applyLag
#define Assert(condition)
Definition: c.h:804
int sync_standby_priority
bool needreload
int i
TimestampTz replyTime
XLogRecPtr apply

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 3586 of file walsender.c.

References Assert, LagTracker::buffer, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

3587 {
3588  TimestampTz time = 0;
3589 
3590  /* Read all unread samples up to this LSN or end of buffer. */
3591  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3592  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3593  {
3594  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3595  lag_tracker->last_read[head] =
3597  lag_tracker->read_heads[head] =
3599  }
3600 
3601  /*
3602  * If the lag tracker is empty, that means the standby has processed
3603  * everything we've ever sent so we should now clear 'last_read'. If we
3604  * didn't do that, we'd risk using a stale and irrelevant sample for
3605  * interpolation at the beginning of the next burst of WAL after a period
3606  * of idleness.
3607  */
3609  lag_tracker->last_read[head].time = 0;
3610 
3611  if (time > now)
3612  {
3613  /* If the clock somehow went backwards, treat as not found. */
3614  return -1;
3615  }
3616  else if (time == 0)
3617  {
3618  /*
3619  * We didn't cross a time. If there is a future sample that we
3620  * haven't reached yet, and we've already reached at least one sample,
3621  * let's interpolate the local flushed time. This is mainly useful
3622  * for reporting a completely stuck apply position as having
3623  * increasing lag, since otherwise we'd have to wait for it to
3624  * eventually start moving again and cross one of our samples before
3625  * we can show the lag increasing.
3626  */
3628  {
3629  /* There are no future samples, so we can't interpolate. */
3630  return -1;
3631  }
3632  else if (lag_tracker->last_read[head].time != 0)
3633  {
3634  /* We can interpolate between last_read and the next sample. */
3635  double fraction;
3636  WalTimeSample prev = lag_tracker->last_read[head];
3638 
3639  if (lsn < prev.lsn)
3640  {
3641  /*
3642  * Reported LSNs shouldn't normally go backwards, but it's
3643  * possible when there is a timeline change. Treat as not
3644  * found.
3645  */
3646  return -1;
3647  }
3648 
3649  Assert(prev.lsn < next.lsn);
3650 
3651  if (prev.time > next.time)
3652  {
3653  /* If the clock somehow went backwards, treat as not found. */
3654  return -1;
3655  }
3656 
3657  /* See how far we are between the previous and next samples. */
3658  fraction =
3659  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3660 
3661  /* Scale the local flush time proportionally. */
3662  time = (TimestampTz)
3663  ((double) prev.time + (next.time - prev.time) * fraction);
3664  }
3665  else
3666  {
3667  /*
3668  * We have only a future sample, implying that we were entirely
3669  * caught up but and now there is a new burst of WAL and the
3670  * standby hasn't processed the first sample yet. Until the
3671  * standby reaches the future sample the best we can do is report
3672  * the hypothetical lag if that sample were to be replayed now.
3673  */
3674  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3675  }
3676  }
3677 
3678  /* Return the elapsed time since local flush time in microseconds. */
3679  Assert(time != 0);
3680  return now - time;
3681 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static int32 next
Definition: blutils.c:219
int write_head
Definition: walsender.c:214
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
XLogRecPtr lsn
Definition: walsender.c:202
#define Assert(condition)
Definition: c.h:804
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 3521 of file walsender.c.

References am_walsender, LagTracker::buffer, i, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

3522 {
3523  bool buffer_full;
3524  int new_write_head;
3525  int i;
3526 
3527  if (!am_walsender)
3528  return;
3529 
3530  /*
3531  * If the lsn hasn't advanced since last time, then do nothing. This way
3532  * we only record a new sample when new WAL has been written.
3533  */
3534  if (lag_tracker->last_lsn == lsn)
3535  return;
3536  lag_tracker->last_lsn = lsn;
3537 
3538  /*
3539  * If advancing the write head of the circular buffer would crash into any
3540  * of the read heads, then the buffer is full. In other words, the
3541  * slowest reader (presumably apply) is the one that controls the release
3542  * of space.
3543  */
3544  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3545  buffer_full = false;
3546  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3547  {
3548  if (new_write_head == lag_tracker->read_heads[i])
3549  buffer_full = true;
3550  }
3551 
3552  /*
3553  * If the buffer is full, for now we just rewind by one slot and overwrite
3554  * the last sample, as a simple (if somewhat uneven) way to lower the
3555  * sampling rate. There may be better adaptive compaction algorithms.
3556  */
3557  if (buffer_full)
3558  {
3559  new_write_head = lag_tracker->write_head;
3560  if (lag_tracker->write_head > 0)
3562  else
3564  }
3565 
3566  /* Store a sample at the current write head position. */
3568  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3569  lag_tracker->write_head = new_write_head;
3570 }
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
int write_head
Definition: walsender.c:214
TimestampTz time
Definition: walsender.c:203
static LagTracker * lag_tracker
Definition: walsender.c:219
bool am_walsender
Definition: walsender.c:115
XLogRecPtr lsn
Definition: walsender.c:202
XLogRecPtr last_lsn
Definition: walsender.c:212
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ logical_read_xlog_page()

static bool logical_read_xlog_page ( XLogReaderState state)
static

Definition at line 807 of file walsender.c.

References CheckXLogRemoved(), XLogReaderState::currTLI, XLogReaderState::currTLIValidUntil, XLogReaderState::nextTLI, XLogReaderState::readBuf, XLogReaderState::readPagePtr, XLogReaderState::reqLen, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, ThisTimeLineID, wal_segment_close(), WALRead(), WALReadRaiseError(), WalSndSegmentOpen(), WalSndWaitForWal(), WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, XLogReadDetermineTimeline(), and XLogReaderSetInputData().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

808 {
809  XLogRecPtr targetPagePtr = state->readPagePtr;
810  int reqLen = state->reqLen;
811  char *cur_page = state->readBuf;
812  XLogRecPtr flushptr;
813  int count;
814  WALReadError errinfo;
815  XLogSegNo segno;
816 
817  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
819  sendTimeLine = state->currTLI;
821  sendTimeLineNextTLI = state->nextTLI;
822 
823  /* make sure we have enough WAL available */
824  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
825 
826  /* fail if not (implies we are going to shut down) */
827  if (flushptr < targetPagePtr + reqLen)
828  {
829  XLogReaderSetInputData(state, -1);
830  return false;
831  }
832 
833  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
834  count = XLOG_BLCKSZ; /* more than one block available */
835  else
836  count = flushptr - targetPagePtr; /* part of the page available */
837 
838  /* now actually read the data, we know it's there */
840  cur_page,
841  targetPagePtr,
842  XLOG_BLCKSZ,
843  state->seg.ws_tli, /* Pass the current TLI because only
844  * WalSndSegmentOpen controls whether new
845  * TLI is needed. */
846  &errinfo))
847  WALReadRaiseError(&errinfo);
848 
849  /*
850  * After reading into the buffer, check that what we read was valid. We do
851  * this after reading, because even though the segment was present when we
852  * opened it, it might get recycled or removed while we read it. The
853  * read() succeeds in that case, but the data we tried to read might
854  * already have been overwritten with new WAL records.
855  */
856  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
857  CheckXLogRemoved(segno, state->seg.ws_tli);
858 
859  XLogReaderSetInputData(state, count);
860  return true;
861 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:974
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:702
WALOpenSegment seg
Definition: xlogreader.h:243
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3976
XLogRecPtr readPagePtr
Definition: xlogreader.h:187
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:264
TimeLineID nextTLI
Definition: xlogreader.h:270
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1368
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2447
TimeLineID ThisTimeLineID
Definition: xlog.c:196
TimeLineID currTLI
Definition: xlogreader.h:254
bool WALRead(XLogReaderState *state, WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1665
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:242
static void XLogReaderSetInputData(XLogReaderState *state, int32 len)
Definition: xlogreader.h:304
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ offset_to_interval()

static Interval* offset_to_interval ( TimeOffset  offset)
static

Definition at line 3248 of file walsender.c.

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

3249 {
3250  Interval *result = palloc(sizeof(Interval));
3251 
3252  result->month = 0;
3253  result->day = 0;
3254  result->time = offset;
3255 
3256  return result;
3257 }
int32 day
Definition: timestamp.h:47
TimeOffset time
Definition: timestamp.h:45
int32 month
Definition: timestamp.h:48
void * palloc(Size size)
Definition: mcxt.c:1062

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action 
)
static

Definition at line 867 of file walsender.c.

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, and REPLICATION_KIND_PHYSICAL.

Referenced by CreateReplicationSlot().

870 {
871  ListCell *lc;
872  bool snapshot_action_given = false;
873  bool reserve_wal_given = false;
874 
875  /* Parse options */
876  foreach(lc, cmd->options)
877  {
878  DefElem *defel = (DefElem *) lfirst(lc);
879 
880  if (strcmp(defel->defname, "export_snapshot") == 0)
881  {
882  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
883  ereport(ERROR,
884  (errcode(ERRCODE_SYNTAX_ERROR),
885  errmsg("conflicting or redundant options")));
886 
887  snapshot_action_given = true;
888  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
890  }
891  else if (strcmp(defel->defname, "use_snapshot") == 0)
892  {
893  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
894  ereport(ERROR,
895  (errcode(ERRCODE_SYNTAX_ERROR),
896  errmsg("conflicting or redundant options")));
897 
898  snapshot_action_given = true;
899  *snapshot_action = CRS_USE_SNAPSHOT;
900  }
901  else if (strcmp(defel->defname, "reserve_wal") == 0)
902  {
903  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
904  ereport(ERROR,
905  (errcode(ERRCODE_SYNTAX_ERROR),
906  errmsg("conflicting or redundant options")));
907 
908  reserve_wal_given = true;
909  *reserve_wal = true;
910  }
911  else
912  elog(ERROR, "unrecognized option: %s", defel->defname);
913  }
914 }
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationKind kind
Definition: replnodes.h:56
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:46
#define ereport(elevel,...)
Definition: elog.h:157
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:746

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3264 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, WalSnd::flushLag, get_call_result_type(), GetUserId(), i, Int32GetDatum, IntervalPGetDatum, is_member_of_role(), IsA, LSNGetDatum, max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, WalSndGetStateString(), WalSndCtlData::walsnds, work_mem, write, WalSnd::write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

3265 {
3266 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3267  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3268  TupleDesc tupdesc;
3269  Tuplestorestate *tupstore;
3270  MemoryContext per_query_ctx;
3271  MemoryContext oldcontext;
3272  SyncRepStandbyData *sync_standbys;
3273  int num_standbys;
3274  int i;
3275 
3276  /* check to see if caller supports us returning a tuplestore */
3277  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3278  ereport(ERROR,
3279  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3280  errmsg("set-valued function called in context that cannot accept a set")));
3281  if (!(rsinfo->allowedModes & SFRM_Materialize))
3282  ereport(ERROR,
3283  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3284  errmsg("materialize mode required, but it is not allowed in this context")));
3285 
3286  /* Build a tuple descriptor for our result type */
3287  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3288  elog(ERROR, "return type must be a row type");
3289 
3290  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3291  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3292 
3293  tupstore = tuplestore_begin_heap(true, false, work_mem);
3294  rsinfo->returnMode = SFRM_Materialize;
3295  rsinfo->setResult = tupstore;
3296  rsinfo->setDesc = tupdesc;
3297 
3298  MemoryContextSwitchTo(oldcontext);
3299 
3300  /*
3301  * Get the currently active synchronous standbys. This could be out of
3302  * date before we're done, but we'll use the data anyway.
3303  */
3304  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3305 
3306  for (i = 0; i < max_wal_senders; i++)
3307  {
3308  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3310  XLogRecPtr write;
3311  XLogRecPtr flush;
3312  XLogRecPtr apply;
3313  TimeOffset writeLag;
3314  TimeOffset flushLag;
3315  TimeOffset applyLag;
3316  int priority;
3317  int pid;
3319  TimestampTz replyTime;
3320  bool is_sync_standby;
3322  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3323  int j;
3324 
3325  /* Collect data from shared memory */
3326  SpinLockAcquire(&walsnd->mutex);
3327  if (walsnd->pid == 0)
3328  {
3329  SpinLockRelease(&walsnd->mutex);
3330  continue;
3331  }
3332  pid = walsnd->pid;
3333  sentPtr = walsnd->sentPtr;
3334  state = walsnd->state;
3335  write = walsnd->write;
3336  flush = walsnd->flush;
3337  apply = walsnd->apply;
3338  writeLag = walsnd->writeLag;
3339  flushLag = walsnd->flushLag;
3340  applyLag = walsnd->applyLag;
3341  priority = walsnd->sync_standby_priority;
3342  replyTime = walsnd->replyTime;
3343  SpinLockRelease(&walsnd->mutex);
3344 
3345  /*
3346  * Detect whether walsender is/was considered synchronous. We can
3347  * provide some protection against stale data by checking the PID
3348  * along with walsnd_index.
3349  */
3350  is_sync_standby = false;
3351  for (j = 0; j < num_standbys; j++)
3352  {
3353  if (sync_standbys[j].walsnd_index == i &&
3354  sync_standbys[j].pid == pid)
3355  {
3356  is_sync_standby = true;
3357  break;
3358  }
3359  }
3360 
3361  memset(nulls, 0, sizeof(nulls));
3362  values[0] = Int32GetDatum(pid);
3363 
3364  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3365  {
3366  /*
3367  * Only superusers and members of pg_read_all_stats can see
3368  * details. Other users only get the pid value to know it's a
3369  * walsender, but no details.
3370  */
3371  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3372  }
3373  else
3374  {
3375  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3376 
3377  if (XLogRecPtrIsInvalid(sentPtr))
3378  nulls[2] = true;
3379  values[2] = LSNGetDatum(sentPtr);
3380 
3381  if (XLogRecPtrIsInvalid(write))
3382  nulls[3] = true;
3383  values[3] = LSNGetDatum(write);
3384 
3385  if (XLogRecPtrIsInvalid(flush))
3386  nulls[4] = true;
3387  values[4] = LSNGetDatum(flush);
3388 
3389  if (XLogRecPtrIsInvalid(apply))
3390  nulls[5] = true;
3391  values[5] = LSNGetDatum(apply);
3392 
3393  /*
3394  * Treat a standby such as a pg_basebackup background process
3395  * which always returns an invalid flush location, as an
3396  * asynchronous standby.
3397  */
3398  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3399 
3400  if (writeLag < 0)
3401  nulls[6] = true;
3402  else
3403  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3404 
3405  if (flushLag < 0)
3406  nulls[7] = true;
3407  else
3408  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3409 
3410  if (applyLag < 0)
3411  nulls[8] = true;
3412  else
3413  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3414 
3415  values[9] = Int32GetDatum(priority);
3416 
3417  /*
3418  * More easily understood version of standby state. This is purely
3419  * informational.
3420  *
3421  * In quorum-based sync replication, the role of each standby
3422  * listed in synchronous_standby_names can be changing very
3423  * frequently. Any standbys considered as "sync" at one moment can
3424  * be switched to "potential" ones at the next moment. So, it's
3425  * basically useless to report "sync" or "potential" as their sync
3426  * states. We report just "quorum" for them.
3427  */
3428  if (priority == 0)
3429  values[10] = CStringGetTextDatum("async");
3430  else if (is_sync_standby)
3432  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3433  else
3434  values[10] = CStringGetTextDatum("potential");
3435 
3436  if (replyTime == 0)
3437  nulls[11] = true;
3438  else
3439  values[11] = TimestampTzGetDatum(replyTime);
3440  }
3441 
3442  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3443  }
3444 
3445  /* clean up and return the tuplestore */
3446  tuplestore_donestoring(tupstore);
3447 
3448  return (Datum) 0;
3449 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
XLogRecPtr write
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
Oid GetUserId(void)
Definition: miscinit.c:478
#define write(a, b, c)
Definition: win32.h:14
int64 TimestampTz
Definition: timestamp.h:39
uint8 syncrep_method
Definition: syncrep.h:69
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
XLogRecPtr flush
#define ERROR
Definition: elog.h:46
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
uintptr_t Datum
Definition: postgres.h:411
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3229
XLogRecPtr sentPtr
int work_mem
Definition: globals.c:124
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
TimeOffset applyLag
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
uint64 XLogRecPtr
Definition: xlogdefs.h:21
int sync_standby_priority
Definition: regguts.h:317
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
WalSndState
Tuplestorestate * setResult
Definition: execnodes.h:310
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:726
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
#define Int32GetDatum(X)
Definition: postgres.h:523
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
int i
#define CStringGetTextDatum(s)
Definition: builtins.h:82
TimestampTz replyTime
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3248
XLogRecPtr apply

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 1831 of file walsender.c.

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

1832 {
1833  bool changed = false;
1835 
1836  Assert(lsn != InvalidXLogRecPtr);
1837  SpinLockAcquire(&slot->mutex);
1838  if (slot->data.restart_lsn != lsn)
1839  {
1840  changed = true;
1841  slot->data.restart_lsn = lsn;
1842  }
1843  SpinLockRelease(&slot->mutex);
1844 
1845  if (changed)
1846  {
1849  }
1850 
1851  /*
1852  * One could argue that the slot should be saved to disk now, but that'd
1853  * be energy wasted - the worst lost information can do here is give us
1854  * wrong information in a statistics view - we'll just potentially be more
1855  * conservative in removing files.
1856  */
1857 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
ReplicationSlotPersistentData data
Definition: slot.h:156
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:839
#define SpinLockRelease(lock)
Definition: spin.h:64
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
XLogRecPtr restart_lsn
Definition: slot.h:81
slock_t mutex
Definition: slot.h:129
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 1968 of file walsender.c.

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

1969 {
1970  bool changed = false;
1972 
1973  SpinLockAcquire(&slot->mutex);
1975 
1976  /*
1977  * For physical replication we don't need the interlock provided by xmin
1978  * and effective_xmin since the consequences of a missed increase are
1979  * limited to query cancellations, so set both at once.
1980  */
1981  if (!TransactionIdIsNormal(slot->data.xmin) ||
1982  !TransactionIdIsNormal(feedbackXmin) ||
1983  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1984  {
1985  changed = true;
1986  slot->data.xmin = feedbackXmin;
1987  slot->effective_xmin = feedbackXmin;
1988  }
1989  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1990  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1991  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1992  {
1993  changed = true;
1994  slot->data.catalog_xmin = feedbackCatalogXmin;
1995  slot->effective_catalog_xmin = feedbackCatalogXmin;
1996  }
1997  SpinLockRelease(&slot->mutex);
1998 
1999  if (changed)
2000  {
2003  }
2004 }
PGPROC * MyProc
Definition: proc.c:68
ReplicationSlotPersistentData data
Definition: slot.h:156
#define SpinLockAcquire(lock)
Definition: spin.h:62
TransactionId effective_xmin
Definition: slot.h:152
TransactionId xmin
Definition: proc.h:138
TransactionId catalog_xmin
Definition: slot.h:78
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xmin
Definition: slot.h:70
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
#define SpinLockRelease(lock)
Definition: spin.h:64
TransactionId effective_catalog_xmin
Definition: slot.h:153
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
slock_t mutex
Definition: slot.h:129
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:789
void ReplicationSlotMarkDirty(void)
Definition: slot.c:750

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 1704 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock, pq_startmsgread(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

1705 {
1706  unsigned char firstchar;
1707  int r;
1708  bool received = false;
1709 
1711 
1712  /*
1713  * If we already received a CopyDone from the frontend, any subsequent
1714  * message is the beginning of a new command, and should be processed in
1715  * the main processing loop.
1716  */
1717  while (!streamingDoneReceiving)
1718  {
1719  pq_startmsgread();
1720  r = pq_getbyte_if_available(&firstchar);
1721  if (r < 0)
1722  {
1723  /* unexpected error or EOF */
1725  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1726  errmsg("unexpected EOF on standby connection")));
1727  proc_exit(0);
1728  }
1729  if (r == 0)
1730  {
1731  /* no data available without blocking */
1732  pq_endmsgread();
1733  break;
1734  }
1735 
1736  /* Read the message contents */
1738  if (pq_getmessage(&reply_message, 0))
1739  {
1741  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1742  errmsg("unexpected EOF on standby connection")));
1743  proc_exit(0);
1744  }
1745 
1746  /* Handle the very limited subset of commands expected in this phase */
1747  switch (firstchar)
1748  {
1749  /*
1750  * 'd' means a standby reply wrapped in a CopyData packet.
1751  */
1752  case 'd':
1754  received = true;
1755  break;
1756 
1757  /*
1758  * CopyDone means the standby requested to finish streaming.
1759  * Reply with CopyDone, if we had not sent that already.
1760  */
1761  case 'c':
1762  if (!streamingDoneSending)
1763  {
1764  pq_putmessage_noblock('c', NULL, 0);
1765  streamingDoneSending = true;
1766  }
1767 
1768  streamingDoneReceiving = true;
1769  received = true;
1770  break;
1771 
1772  /*
1773  * 'X' means that the standby is closing down the socket.
1774  */
1775  case 'X':
1776  proc_exit(0);
1777 
1778  default:
1779  ereport(FATAL,
1780  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1781  errmsg("invalid standby message type \"%c\"",
1782  firstchar)));
1783  }
1784  }
1785 
1786  /*
1787  * Save the last reply timestamp if we've received at least one reply.
1788  */
1789  if (received)
1790  {
1792  waiting_for_ping_response = false;
1793  }
1794 }
static void ProcessStandbyMessage(void)
Definition: walsender.c:1800
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1150
#define FATAL
Definition: elog.h:49
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1032
static bool streamingDoneSending
Definition: walsender.c:179
#define COMMERROR
Definition: elog.h:30
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1212
static StringInfoData reply_message
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1174
#define ereport(elevel,...)
Definition: elog.h:157
static bool streamingDoneReceiving
Definition: walsender.c:180
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:42
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2048 of file walsender.c.

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

2049 {
2050  TransactionId feedbackXmin;
2051  uint32 feedbackEpoch;
2052  TransactionId feedbackCatalogXmin;
2053  uint32 feedbackCatalogEpoch;
2054  TimestampTz replyTime;
2055 
2056  /*
2057  * Decipher the reply message. The caller already consumed the msgtype
2058  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2059  * of this message.
2060  */
2061  replyTime = pq_getmsgint64(&reply_message);
2062  feedbackXmin = pq_getmsgint(&reply_message, 4);
2063  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2064  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2065  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2066 
2068  {
2069  char *replyTimeStr;
2070 
2071  /* Copy because timestamptz_to_str returns a static buffer */
2072  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2073 
2074  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2075  feedbackXmin,
2076  feedbackEpoch,
2077  feedbackCatalogXmin,
2078  feedbackCatalogEpoch,
2079  replyTimeStr);
2080 
2081  pfree(replyTimeStr);
2082  }
2083 
2084  /*
2085  * Update shared state for this WalSender process based on reply data from
2086  * standby.
2087  */
2088  {
2089  WalSnd *walsnd = MyWalSnd;
2090 
2091  SpinLockAcquire(&walsnd->mutex);
2092  walsnd->replyTime = replyTime;
2093  SpinLockRelease(&walsnd->mutex);
2094  }
2095 
2096  /*
2097  * Unset WalSender's xmins if the feedback message values are invalid.
2098  * This happens when the downstream turned hot_standby_feedback off.
2099  */
2100  if (!TransactionIdIsNormal(feedbackXmin)
2101  && !TransactionIdIsNormal(feedbackCatalogXmin))
2102  {
2104  if (MyReplicationSlot != NULL)
2105  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2106  return;
2107  }
2108 
2109  /*
2110  * Check that the provided xmin/epoch are sane, that is, not in the future
2111  * and not so far back as to be already wrapped around. Ignore if not.
2112  */
2113  if (TransactionIdIsNormal(feedbackXmin) &&
2114  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2115  return;
2116 
2117  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2118  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2119  return;
2120 
2121  /*
2122  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2123  * the xmin will be taken into account by GetSnapshotData() /
2124  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2125  * thereby prevent the generation of cleanup conflicts on the standby
2126  * server.
2127  *
2128  * There is a small window for a race condition here: although we just
2129  * checked that feedbackXmin precedes nextXid, the nextXid could have
2130  * gotten advanced between our fetching it and applying the xmin below,
2131  * perhaps far enough to make feedbackXmin wrap around. In that case the
2132  * xmin we set here would be "in the future" and have no effect. No point
2133  * in worrying about this since it's too late to save the desired data
2134  * anyway. Assuming that the standby sends us an increasing sequence of
2135  * xmins, this could only happen during the first reply cycle, else our
2136  * own xmin would prevent nextXid from advancing so far.
2137  *
2138  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2139  * is assumed atomic, and there's no real need to prevent concurrent
2140  * horizon determinations. (If we're moving our xmin forward, this is
2141  * obviously safe, and if we're moving it backwards, well, the data is at
2142  * risk already since a VACUUM could already have determined the horizon.)
2143  *
2144  * If we're using a replication slot we reserve the xmin via that,
2145  * otherwise via the walsender's PGPROC entry. We can only track the
2146  * catalog xmin separately when using a slot, so we store the least of the
2147  * two provided when not using a slot.
2148  *
2149  * XXX: It might make sense to generalize the ephemeral slot concept and
2150  * always use the slot mechanism to handle the feedback xmin.
2151  */
2152  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2153  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2154  else
2155  {
2156  if (TransactionIdIsNormal(feedbackCatalogXmin)
2157  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2158  MyProc->xmin = feedbackCatalogXmin;
2159  else
2160  MyProc->xmin = feedbackXmin;
2161  }
2162 }
uint32 TransactionId
Definition: c.h:587
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
slock_t mutex
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2017
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pfree(void *pointer)
Definition: mcxt.c:1169
TransactionId xmin
Definition: proc.h:138
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define InvalidTransactionId
Definition: transam.h:31
unsigned int uint32
Definition: c.h:441
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1968
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
TimestampTz replyTime
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 1800 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), and ProcessStandbyReplyMessage().

Referenced by ProcessRepliesIfAny().

1801 {
1802  char msgtype;
1803 
1804  /*
1805  * Check message type from the first byte.
1806  */
1807  msgtype = pq_getmsgbyte(&reply_message);
1808 
1809  switch (msgtype)
1810  {
1811  case 'r':
1813  break;
1814 
1815  case 'h':
1817  break;
1818 
1819  default:
1821  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1822  errmsg("unexpected message type \"%c\"", msgtype)));
1823  proc_exit(0);
1824  }
1825 }
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define COMMERROR
Definition: elog.h:30
static StringInfoData reply_message
Definition: walsender.c:158
#define ereport(elevel,...)
Definition: elog.h:157
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1863
int errmsg(const char *fmt,...)
Definition: elog.c:909
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2048

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 1863 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

1864 {
1865  XLogRecPtr writePtr,
1866  flushPtr,
1867  applyPtr;
1868  bool replyRequested;
1869  TimeOffset writeLag,
1870  flushLag,
1871  applyLag;
1872  bool clearLagTimes;
1873  TimestampTz now;
1874  TimestampTz replyTime;
1875 
1876  static bool fullyAppliedLastTime = false;
1877 
1878  /* the caller already consumed the msgtype byte */
1879  writePtr = pq_getmsgint64(&reply_message);
1880  flushPtr = pq_getmsgint64(&reply_message);
1881  applyPtr = pq_getmsgint64(&reply_message);
1882  replyTime = pq_getmsgint64(&reply_message);
1883  replyRequested = pq_getmsgbyte(&reply_message);
1884 
1886  {
1887  char *replyTimeStr;
1888 
1889  /* Copy because timestamptz_to_str returns a static buffer */
1890  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1891 
1892  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1893  LSN_FORMAT_ARGS(writePtr),
1894  LSN_FORMAT_ARGS(flushPtr),
1895  LSN_FORMAT_ARGS(applyPtr),
1896  replyRequested ? " (reply requested)" : "",
1897  replyTimeStr);
1898 
1899  pfree(replyTimeStr);
1900  }
1901 
1902  /* See if we can compute the round-trip lag for these positions. */
1903  now = GetCurrentTimestamp();
1904  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1905  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1906  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1907 
1908  /*
1909  * If the standby reports that it has fully replayed the WAL in two
1910  * consecutive reply messages, then the second such message must result
1911  * from wal_receiver_status_interval expiring on the standby. This is a
1912  * convenient time to forget the lag times measured when it last
1913  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1914  * until more WAL traffic arrives.
1915  */
1916  clearLagTimes = false;
1917  if (applyPtr == sentPtr)
1918  {
1919  if (fullyAppliedLastTime)
1920  clearLagTimes = true;
1921  fullyAppliedLastTime = true;
1922  }
1923  else
1924  fullyAppliedLastTime = false;
1925 
1926  /* Send a reply if the standby requested one. */
1927  if (replyRequested)
1928  WalSndKeepalive(false);
1929 
1930  /*
1931  * Update shared state for this WalSender process based on reply data from
1932  * standby.
1933  */
1934  {
1935  WalSnd *walsnd = MyWalSnd;
1936 
1937  SpinLockAcquire(&walsnd->mutex);
1938  walsnd->write = writePtr;
1939  walsnd->flush = flushPtr;
1940  walsnd->apply = applyPtr;
1941  if (writeLag != -1 || clearLagTimes)
1942  walsnd->writeLag = writeLag;
1943  if (flushLag != -1 || clearLagTimes)
1944  walsnd->flushLag = flushLag;
1945  if (applyLag != -1 || clearLagTimes)
1946  walsnd->applyLag = applyLag;
1947  walsnd->replyTime = replyTime;
1948  SpinLockRelease(&walsnd->mutex);
1949  }
1950 
1953 
1954  /*
1955  * Advance our local xmin horizon when the client confirmed a flush.
1956  */
1957  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1958  {
1961  else
1963  }
1964 }
XLogRecPtr write
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1299
TimeOffset flushLag
TimeOffset writeLag
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
XLogRecPtr flush
void pfree(void *pointer)
Definition: mcxt.c:1169
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3460
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1831
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
#define DEBUG2
Definition: elog.h:24
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
#define SlotIsLogical(slot)
Definition: slot.h:178
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
static StringInfoData reply_message
Definition: walsender.c:158
#define SpinLockRelease(lock)
Definition: spin.h:64
int64 TimeOffset
Definition: timestamp.h:40
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3586
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
TimeOffset applyLag
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1676
#define elog(elevel,...)
Definition: elog.h:232
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
TimestampTz replyTime
XLogRecPtr apply
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:441
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
bool am_cascading_walsender
Definition: walsender.c:116
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 465 of file walsender.c.

References buf, CloseTransientFile(), PGAlignedBlock::data, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), read, TimeLineHistoryCmd::timeline, TLHistoryFileName, TLHistoryFilePath, and WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ.

Referenced by exec_replication_command().

466 {
468  char histfname[MAXFNAMELEN];
469  char path[MAXPGPATH];
470  int fd;
471  off_t histfilelen;
472  off_t bytesleft;
473  Size len;
474 
475  /*
476  * Reply with a result set with one row, and two columns. The first col is
477  * the name of the history file, 2nd is the contents.
478  */
479 
480  TLHistoryFileName(histfname, cmd->timeline);
481  TLHistoryFilePath(path, cmd->timeline);
482 
483  /* Send a RowDescription message */
484  pq_beginmessage(&buf, 'T');
485  pq_sendint16(&buf, 2); /* 2 fields */
486 
487  /* first field */
488  pq_sendstring(&buf, "filename"); /* col name */
489  pq_sendint32(&buf, 0); /* table oid */
490  pq_sendint16(&buf, 0); /* attnum */
491  pq_sendint32(&buf, TEXTOID); /* type oid */
492  pq_sendint16(&buf, -1); /* typlen */
493  pq_sendint32(&buf, 0); /* typmod */
494  pq_sendint16(&buf, 0); /* format code */
495 
496  /* second field */
497  pq_sendstring(&buf, "content"); /* col name */
498  pq_sendint32(&buf, 0); /* table oid */
499  pq_sendint16(&buf, 0); /* attnum */
500  pq_sendint32(&buf, TEXTOID); /* type oid */
501  pq_sendint16(&buf, -1); /* typlen */
502  pq_sendint32(&buf, 0); /* typmod */
503  pq_sendint16(&buf, 0); /* format code */
504  pq_endmessage(&buf);
505 
506  /* Send a DataRow message */
507  pq_beginmessage(&buf, 'D');
508  pq_sendint16(&buf, 2); /* # of columns */
509  len = strlen(histfname);
510  pq_sendint32(&buf, len); /* col1 len */
511  pq_sendbytes(&buf, histfname, len);
512 
513  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514  if (fd < 0)
515  ereport(ERROR,
517  errmsg("could not open file \"%s\": %m", path)));
518 
519  /* Determine file length and send it to client */
520  histfilelen = lseek(fd, 0, SEEK_END);
521  if (histfilelen < 0)
522  ereport(ERROR,
524  errmsg("could not seek to end of file \"%s\": %m", path)));
525  if (lseek(fd, 0, SEEK_SET) != 0)
526  ereport(ERROR,
528  errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530  pq_sendint32(&buf, histfilelen); /* col2 len */
531 
532  bytesleft = histfilelen;
533  while (bytesleft > 0)
534  {
535  PGAlignedBlock rbuf;
536  int nread;
537 
539  nread = read(fd, rbuf.data, sizeof(rbuf));
541  if (nread < 0)
542  ereport(ERROR,
544  errmsg("could not read file \"%s\": %m",
545  path)));
546  else if (nread == 0)
547  ereport(ERROR,
549  errmsg("could not read file \"%s\": read %d of %zu",
550  path, nread, (Size) bytesleft)));
551 
552  pq_sendbytes(&buf, rbuf.data, nread);
553  bytesleft -= nread;
554  }
555 
556  if (CloseTransientFile(fd) != 0)
557  ereport(ERROR,
559  errmsg("could not close file \"%s\": %m", path)));
560 
561  pq_endmessage(&buf);
562 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
int errcode(int sqlerrcode)
Definition: elog.c:698
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TimeLineID timeline
Definition: replnodes.h:98
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
char data[BLCKSZ]
Definition: c.h:1141
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2423
#define MAXPGPATH
#define TLHistoryFileName(fname, tli)
static char * buf
Definition: pg_test_fsync.c:68
int errcode_for_file_access(void)
Definition: elog.c:721
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:261
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int CloseTransientFile(int fd)
Definition: fd.c:2600
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
size_t Size
Definition: c.h:540
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define read(a, b, c)
Definition: win32.h:13
#define TLHistoryFilePath(path, tli)

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1132 of file walsender.c.

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, FreeDecodingContext(), got_STOPPING, LOG, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, SAB_Error, WalSnd::sentPtr, sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XLogBeginRead(), XLogRecPtrIsInvalid, and XLogSendLogical().

Referenced by exec_replication_command().

1133 {
1135  QueryCompletion qc;
1136 
1137  /* make sure that our requirements are still fulfilled */
1139 
1141 
1143 
1145  ereport(ERROR,
1146  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1147  errmsg("cannot read from logical replication slot \"%s\"",
1148  cmd->slotname),
1149  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1150 
1151  /*
1152  * Force a disconnect, so that the decoding code doesn't need to care
1153  * about an eventual switch from running in recovery, to running in a
1154  * normal environment. Client code is expected to handle reconnects.
1155  */
1157  {
1158  ereport(LOG,
1159  (errmsg("terminating walsender process after promotion")));
1160  got_STOPPING = true;
1161  }
1162 
1163  /*
1164  * Create our decoding context, making it start at the previously ack'ed
1165  * position.
1166  *
1167  * Do this before sending a CopyBothResponse message, so that any errors
1168  * are reported early.
1169  */
1171  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1177 
1179 
1180  /* Send a CopyBothResponse message, and start streaming */
1181  pq_beginmessage(&buf, 'W');
1182  pq_sendbyte(&buf, 0);
1183  pq_sendint16(&buf, 0);
1184  pq_endmessage(&buf);
1185  pq_flush();
1186 
1187  /* Start reading WAL from the oldest required WAL. */
1190 
1191  /*
1192  * Report the location after which we'll send out further commits as the
1193  * current sentPtr.
1194  */
1196 
1197  /* Also update the sent position status in shared memory */
1201 
1202  replication_active = true;
1203 
1205 
1206  /* Main loop of walsender */
1208 
1211 
1212  replication_active = false;
1213  if (got_STOPPING)
1214  proc_exit(0);
1216 
1217  /* Get out of COPY mode (CommandComplete). */
1218  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1219  EndCommand(&qc, DestRemote, false);
1220 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr startpoint
Definition: replnodes.h:86
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
#define pq_flush()
Definition: libpq.h:37
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, LogicalDecodingXLogPageReadCB page_read, WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
ReplicationSlotPersistentData data
Definition: slot.h:156
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:8237
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
XLogRecPtr confirmed_flush
Definition: slot.h:92
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:46
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool logical_read_xlog_page(XLogReaderState *state)
Definition: walsender.c:807
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static char * buf
Definition: pg_test_fsync.c:68
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:252
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1258
void ReplicationSlotRelease(void)
Definition: slot.c:497
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2243
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1231
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3210
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:624
static void XLogSendLogical(void)
Definition: walsender.c:2826
XLogRecPtr restart_lsn
Definition: slot.h:81
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:43
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:388
Definition: slot.h:43
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
bool am_cascading_walsender
Definition: walsender.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1342

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 571 of file walsender.c.

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, MemSet, WalSnd::mutex, MyReplicationSlot, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), proc_exit(), readTimeLineHistory(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), SAB_Error, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

572 {
574  XLogRecPtr FlushPtr;
575 
576  if (ThisTimeLineID == 0)
577  ereport(ERROR,
578  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
579  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
580 
581  /* create xlogreader for physical replication */
582  xlogreader =
584 
585  if (!xlogreader)
586  ereport(ERROR,
587  (errcode(ERRCODE_OUT_OF_MEMORY),
588  errmsg("out of memory")));
589 
590  /*
591  * We assume here that we're logging enough information in the WAL for
592  * log-shipping, since this is checked in PostmasterMain().
593  *
594  * NOTE: wal_level can only change at shutdown, so in most cases it is
595  * difficult for there to be WAL data that we can still see that was
596  * written at wal_level='minimal'.
597  */
598 
599  if (cmd->slotname)
600  {
603  ereport(ERROR,
604  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
605  errmsg("cannot use a logical replication slot for physical replication")));
606 
607  /*
608  * We don't need to verify the slot's restart_lsn here; instead we
609  * rely on the caller requesting the starting point to use. If the
610  * WAL segment doesn't exist, we'll fail later.
611  */
612  }
613 
614  /*
615  * Select the timeline. If it was given explicitly by the client, use
616  * that. Otherwise use the timeline of the last replayed record, which is
617  * kept in ThisTimeLineID.
618  */
620  {
621  /* this also updates ThisTimeLineID */
622  FlushPtr = GetStandbyFlushRecPtr();
623  }
624  else
625  FlushPtr = GetFlushRecPtr();
626 
627  if (cmd->timeline != 0)
628  {
629  XLogRecPtr switchpoint;
630 
631  sendTimeLine = cmd->timeline;
633  {
634  sendTimeLineIsHistoric = false;
636  }
637  else
638  {
639  List *timeLineHistory;
640 
641  sendTimeLineIsHistoric = true;
642 
643  /*
644  * Check that the timeline the client requested exists, and the
645  * requested start location is on that timeline.
646  */
647  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
648  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
650  list_free_deep(timeLineHistory);
651 
652  /*
653  * Found the requested timeline in the history. Check that
654  * requested startpoint is on that timeline in our history.
655  *
656  * This is quite loose on purpose. We only check that we didn't
657  * fork off the requested timeline before the switchpoint. We
658  * don't check that we switched *to* it before the requested
659  * starting point. This is because the client can legitimately
660  * request to start replication from the beginning of the WAL
661  * segment that contains switchpoint, but on the new timeline, so
662  * that it doesn't end up with a partial segment. If you ask for
663  * too old a starting point, you'll get an error later when we
664  * fail to find the requested WAL segment in pg_wal.
665  *
666  * XXX: we could be more strict here and only allow a startpoint
667  * that's older than the switchpoint, if it's still in the same
668  * WAL segment.
669  */
670  if (!XLogRecPtrIsInvalid(switchpoint) &&
671  switchpoint < cmd->startpoint)
672  {
673  ereport(ERROR,
674  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
676  cmd->timeline),
677  errdetail("This server's history forked from timeline %u at %X/%X.",
678  cmd->timeline,
679  LSN_FORMAT_ARGS(switchpoint))));
680  }
681  sendTimeLineValidUpto = switchpoint;
682  }
683  }
684  else
685  {
688  sendTimeLineIsHistoric = false;
689  }
690 
692 
693  /* If there is nothing to stream, don't even enter COPY mode */
695  {
696  /*
697  * When we first start replication the standby will be behind the
698  * primary. For some applications, for example synchronous
699  * replication, it is important to have a clear state for this initial
700  * catchup mode, so we can trigger actions when we change streaming
701  * state later. We may stay in this state for a long time, which is
702  * exactly why we want to be able to monitor whether or not we are
703  * still here.
704  */
706 
707  /* Send a CopyBothResponse message, and start streaming */
708  pq_beginmessage(&buf, 'W');
709  pq_sendbyte(&buf, 0);
710  pq_sendint16(&buf, 0);
711  pq_endmessage(&buf);
712  pq_flush();
713 
714  /*
715  * Don't allow a request to stream from a future point in WAL that
716  * hasn't been flushed to disk in this server yet.
717  */
718  if (FlushPtr < cmd->startpoint)
719  {
720  ereport(ERROR,
721  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
723  LSN_FORMAT_ARGS(FlushPtr))));
724  }
725 
726  /* Start streaming from the requested point */
727  sentPtr = cmd->startpoint;
728 
729  /* Initialize shared memory status, too */
733 
735 
736  /* Main loop of walsender */
737  replication_active = true;
738 
740 
741  replication_active = false;
742  if (got_STOPPING)
743  proc_exit(0);
745 
747  }
748 
749  if (cmd->slotname)
751 
752  /*
753  * Copy is finished now. Send a single-row result set indicating the next
754  * timeline.
755  */
757  {
758  char startpos_str[8 + 1 + 8 + 1];
760  TupOutputState *tstate;
761  TupleDesc tupdesc;
762  Datum values[2];
763  bool nulls[2];
764 
765  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
767 
769  MemSet(nulls, false, sizeof(nulls));
770 
771  /*
772  * Need a tuple descriptor representing two columns. int8 may seem
773  * like a surprising data type for this, but in theory int4 would not
774  * be wide enough for this, as TimeLineID is unsigned.
775  */
776  tupdesc = CreateTemplateTupleDesc(2);
777  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
778  INT8OID, -1, 0);
779  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
780  TEXTOID, -1, 0);
781 
782  /* prepare for projection of tuple */
783  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
784 
785  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
786  values[1] = CStringGetTextDatum(startpos_str);
787 
788  /* send it to dest */
789  do_tup_output(tstate, values, nulls);
790 
791  end_tup_output(tstate);
792  }
793 
794  /* Send CommandComplete message */
795  EndReplicationCommand("START_STREAMING");
796 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr startpoint
Definition: replnodes.h:86
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
#define pq_flush()
Definition: libpq.h:37
int wal_segment_size
Definition: xlog.c:121
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, WALSegmentCleanupCB cleanup_cb)
Definition: xlogreader.c:82
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
static void XLogSendPhysical(void)
Definition: walsender.c:2525
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define MemSet(start, val, len)
Definition: c.h:1008
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
void list_free_deep(List *list)
Definition: list.c:1405
slock_t mutex
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:658
#define ERROR
Definition: elog.h:46
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
static char * buf
Definition: pg_test_fsync.c:68
static bool streamingDoneSending
Definition: walsender.c:179
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1042
void ReplicationSlotRelease(void)
Definition: slot.c:497
#define SlotIsLogical(slot)
Definition: slot.h:178
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1700
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2243
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
#define SpinLockRelease(lock)
Definition: spin.h:64
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:411
XLogRecPtr sentPtr
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
TimeLineID ThisTimeLineID
Definition: xlog.c:196
#define ereport(elevel,...)
Definition: elog.h:157
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3210
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
static bool streamingDoneReceiving
Definition: walsender.c:180
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:388
Definition: slot.h:43
TimeLineID timeline
Definition: replnodes.h:85
#define CStringGetTextDatum(s)
Definition: builtins.h:82
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2951
Definition: pg_list.h:50
#define snprintf
Definition: port.h:216
int16 AttrNumber
Definition: attnum.h:21
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
bool am_cascading_walsender
Definition: walsender.c:116

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2017 of file walsender.c.

References EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

2018 {
2019  FullTransactionId nextFullXid;
2020  TransactionId nextXid;
2021  uint32 nextEpoch;
2022 
2023  nextFullXid = ReadNextFullTransactionId();
2024  nextXid = XidFromFullTransactionId(nextFullXid);
2025  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2026 
2027  if (xid <= nextXid)
2028  {
2029  if (epoch != nextEpoch)
2030  return false;
2031  }
2032  else
2033  {
2034  if (epoch + 1 != nextEpoch)
2035  return false;
2036  }
2037 
2038  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2039  return false; /* epoch OK, but it's wrapped around */
2040 
2041  return true;
2042 }
uint32 TransactionId
Definition: c.h:587
#define XidFromFullTransactionId(x)
Definition: transam.h:48
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
unsigned int uint32
Definition: c.h:441
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static const unsigned __int64 epoch
Definition: gettimeofday.c:34

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2216 of file walsender.c.

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2217 {
2218  TimestampTz timeout;
2219 
2220  /* don't bail out if we're doing something that doesn't require timeouts */
2221  if (last_reply_timestamp <= 0)
2222  return;
2223 
2226 
2227  if (wal_sender_timeout > 0 && last_processing >= timeout)
2228  {
2229  /*
2230  * Since typically expiration of replication timeout means
2231  * communication problem, we don't send the error message to the
2232  * standby.
2233  */
2235  (errmsg("terminating walsender process due to replication timeout")));
2236 
2237  WalSndShutdown();
2238  }
2239 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static TimestampTz last_processing
Definition: walsender.c:162
#define COMMERROR
Definition: elog.h:30
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg(const char *fmt,...)
Definition: elog.c:909
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2172 of file walsender.c.

References last_reply_timestamp, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

2173 {
2174  long sleeptime = 10000; /* 10 s */
2175 
2177  {
2178  TimestampTz wakeup_time;
2179 
2180  /*
2181  * At the latest stop sleeping once wal_sender_timeout has been
2182  * reached.
2183  */
2186 
2187  /*
2188  * If no ping has been sent yet, wakeup when it's time to do so.
2189  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2190  * the timeout passed without a response.
2191  */
2194  wal_sender_timeout / 2);
2195 
2196  /* Compute relative time until wakeup. */
2197  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2198  }
2199 
2200  return sleeptime;
2201 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 2911 of file walsender.c.

References DestRemote, EndCommand(), WalSnd::flush, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

2912 {
2913  XLogRecPtr replicatedPtr;
2914 
2915  /* ... let's just be real sure we're caught up ... */
2916  send_data();
2917 
2918  /*
2919  * To figure out whether all WAL has successfully been replicated, check
2920  * flush location if valid, write otherwise. Tools like pg_receivewal will
2921  * usually (unless in synchronous mode) return an invalid flush location.
2922  */
2923  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2925 
2926  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2927  !pq_is_send_pending())
2928  {
2929  QueryCompletion qc;
2930 
2931  /* Inform the standby that XLOG streaming is done */
2932  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2933  EndCommand(&qc, DestRemote, false);
2934  pq_flush();
2935 
2936  proc_exit(0);
2937  }
2939  WalSndKeepalive(true);
2940 }
#define pq_is_send_pending()
Definition: libpq.h:39
XLogRecPtr write
#define pq_flush()
Definition: libpq.h:37
void proc_exit(int code)
Definition: ipc.c:104
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3460
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
static bool WalSndCaughtUp
Definition: walsender.c:183
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 296 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:823
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
WALOpenSegment seg
Definition: xlogreader.h:243
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:497
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3210
void ReplicationSlotCleanup(void)
Definition: slot.c:553
void LWLockReleaseAll(void)
Definition: lwlock.c:1903

◆ WalSndGetStateString()

static const char* WalSndGetStateString ( WalSndState  state)
static

Definition at line 3229 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

3230 {
3231  switch (state)
3232  {
3233  case WALSNDSTATE_STARTUP:
3234  return "startup";
3235  case WALSNDSTATE_BACKUP:
3236  return "backup";
3237  case WALSNDSTATE_CATCHUP:
3238  return "catchup";
3239  case WALSNDSTATE_STREAMING:
3240  return "streaming";
3241  case WALSNDSTATE_STOPPING:
3242  return "stopping";
3243  }
3244  return "UNKNOWN";
3245 }
Definition: regguts.h:317

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3146 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3147 {
3148  int i;
3149 
3150  for (i = 0; i < max_wal_senders; i++)
3151  {
3152  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3153  pid_t pid;
3154 
3155  SpinLockAcquire(&walsnd->mutex);
3156  pid = walsnd->pid;
3157  SpinLockRelease(&walsnd->mutex);
3158 
3159  if (pid == 0)
3160  continue;
3161 
3163  }
3164 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply)
static

Definition at line 3460 of file walsender.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, and waiting_for_ping_response.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), and WalSndWaitForWal().

3461 {
3462  elog(DEBUG2, "sending replication keepalive");
3463 
3464  /* construct the message... */
3466  pq_sendbyte(&output_message, 'k');
3469  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3470 
3471  /* ... and send it wrapped in CopyData */
3473 
3474  /* Set local flag */
3475  if (requestReply)
3477 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static StringInfoData output_message
Definition: walsender.c:157
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define DEBUG2
Definition: elog.h:24
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static XLogRecPtr sentPtr
Definition: walsender.c:154
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:42
#define elog(elevel,...)
Definition: elog.h:232
static bool waiting_for_ping_response
Definition: walsender.c:171

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 3483 of file walsender.c.

References last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3484 {
3485  TimestampTz ping_time;
3486 
3487  /*
3488  * Don't send keepalive messages if timeouts are globally disabled or
3489  * we're doing something not partaking in timeouts.
3490  */
3491  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3492  return;
3493 
3495  return;
3496 
3497  /*
3498  * If half of wal_sender_timeout has lapsed without receiving any reply
3499  * from the standby, send a keep-alive message to the standby requesting
3500  * an immediate reply.
3501  */
3503  wal_sender_timeout / 2);
3504  if (last_processing >= ping_time)
3505  {
3506  WalSndKeepalive(true);
3507 
3508  /* Try to flush pending output to the client */
3509  if (pq_flush_if_writable() != 0)
3510  WalSndShutdown();
3511  }
3512 }
int wal_sender_timeout
Definition: walsender.c:123
int64 TimestampTz
Definition: timestamp.h:39
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3460
static TimestampTz last_processing
Definition: walsender.c:162
#define pq_flush_if_writable()
Definition: libpq.h:38
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2429 of file walsender.c.

References Assert, WalSnd::latch, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

2430 {
2431  WalSnd *walsnd = MyWalSnd;
2432 
2433  Assert(walsnd != NULL);
2434 
2435  MyWalSnd = NULL;
2436 
2437  SpinLockAcquire(&walsnd->mutex);
2438  /* clear latch while holding the spinlock, so it can safely be read */
2439  walsnd->latch = NULL;
2440  /* Mark WalSnd struct as no longer being in use. */
2441  walsnd->pid = 0;
2442  SpinLockRelease(&walsnd->mutex);
2443 }
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3026 of file walsender.c.

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

3027 {
3028  int save_errno = errno;
3029 
3030  got_SIGUSR2 = true;
3031  SetLatch(MyLatch);
3032 
3033  errno = save_errno;
3034 }
void SetLatch(Latch *latch)
Definition: latch.c:567
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
struct Latch * MyLatch
Definition: globals.c:57

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2243 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_MAIN, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

2244 {
2245  /*
2246  * Initialize the last reply timestamp. That enables timeout processing
2247  * from hereon.
2248  */
2250  waiting_for_ping_response = false;
2251 
2252  /*
2253  * Loop until we reach the end of this timeline or the client requests to
2254  * stop streaming.
2255  */
2256  for (;;)
2257  {
2258  /* Clear any already-pending wakeups */
2260 
2262 
2263  /* Process any requests or signals received recently */
2264  if (ConfigReloadPending)
2265  {
2266  ConfigReloadPending = false;
2269  }
2270 
2271  /* Check for input from the client */
2273 
2274  /*
2275  * If we have received CopyDone from the client, sent CopyDone
2276  * ourselves, and the output buffer is empty, it's time to exit
2277  * streaming.
2278  */
2280  !pq_is_send_pending())
2281  break;
2282 
2283  /*
2284  * If we don't have any pending data in the output buffer, try to send
2285  * some more. If there is some, we don't bother to call send_data
2286  * again until we've flushed it ... but we'd better assume we are not
2287  * caught up.
2288  */
2289  if (!pq_is_send_pending())
2290  send_data();
2291  else
2292  WalSndCaughtUp = false;
2293 
2294  /* Try to flush pending output to the client */
2295  if (pq_flush_if_writable() != 0)
2296  WalSndShutdown();
2297 
2298  /* If nothing remains to be sent right now ... */
2300  {
2301  /*
2302  * If we're in catchup state, move to streaming. This is an
2303  * important state change for users to know about, since before
2304  * this point data loss might occur if the primary dies and we
2305  * need to failover to the standby. The state change is also
2306  * important for synchronous replication, since commits that
2307  * started to wait at that point might wait for some time.
2308  */
2310  {
2311  ereport(DEBUG1,
2312  (errmsg_internal("\"%s\" has now caught up with upstream server",
2313  application_name)));
2315  }
2316 
2317  /*
2318  * When SIGUSR2 arrives, we send any outstanding logs up to the
2319  * shutdown checkpoint record (i.e., the latest record), wait for
2320  * them to be replicated to the standby, and exit. This may be a
2321  * normal termination at shutdown, or a promotion, the walsender
2322  * is not sure which.
2323  */
2324  if (got_SIGUSR2)
2325  WalSndDone(send_data);
2326  }
2327 
2328  /* Check for replication timeout. */
2330 
2331  /* Send keepalive if the time has come */
2333 
2334  /*
2335  * Block if we have unsent data. XXX For logical replication, let
2336  * WalSndWaitForWal() handle any other blocking; idle receivers need
2337  * its additional actions. For physical replication, also block if
2338  * caught up; its send_data does not block.
2339  */
2340  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2343  {
2344  long sleeptime;
2345  int wakeEvents;
2346 
2348  wakeEvents = WL_SOCKET_READABLE;
2349  else
2350  wakeEvents = 0;
2351 
2352  /*
2353  * Use fresh timestamp, not last_processing, to reduce the chance
2354  * of reaching wal_sender_timeout before sending a keepalive.
2355  */
2357 
2358  if (pq_is_send_pending())
2359  wakeEvents |= WL_SOCKET_WRITEABLE;
2360 
2361  /* Sleep until something happens or we time out */
2362  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2363  }
2364  }
2365 }
#define pq_is_send_pending()
Definition: libpq.h:39
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3129
#define DEBUG1
Definition: elog.h:25
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2911
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ResetLatch(Latch *latch)
Definition: latch.c:660
WalSndState state
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:38
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3483
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2216
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2172
void WalSndSetState(WalSndState state)
Definition: walsender.c:3210
static void XLogSendLogical(void)
Definition: walsender.c:2826
static bool streamingDoneReceiving
Definition: walsender.c:180
char * application_name
Definition: guc.c:590
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
static bool waiting_for_ping_response
Definition: walsender.c:171
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1704

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1231 of file walsender.c.

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1232 {
1233  /* can't have sync rep confused by sending the same LSN several times */
1234  if (!last_write)
1235  lsn = InvalidXLogRecPtr;
1236 
1237  resetStringInfo(ctx->out);
1238 
1239  pq_sendbyte(ctx->out, 'w');
1240  pq_sendint64(ctx->out, lsn); /* dataStart */
1241  pq_sendint64(ctx->out, lsn); /* walEnd */
1242 
1243  /*
1244  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1245  * reserve space here.
1246  */
1247  pq_sendint64(ctx->out, 0); /* sendtime */
1248 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
StringInfo out
Definition: logical.h:73

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 331 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2981 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2982 {
2983  int i;
2984 
2985  for (i = 0; i < max_wal_senders; i++)
2986  {
2987  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2988 
2989  SpinLockAcquire(&walsnd->mutex);
2990  if (walsnd->pid == 0)
2991  {
2992  SpinLockRelease(&walsnd->mutex);
2993  continue;
2994  }
2995  walsnd->needreload = true;
2996  SpinLockRelease(&walsnd->mutex);
2997  }
2998 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2447 of file walsender.c.

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, XLByteToSeg, XLogFileName, and XLogFilePath.

Referenced by logical_read_xlog_page(), and XLogSendPhysical().

2449 {
2450  char path[MAXPGPATH];
2451 
2452  /*-------
2453  * When reading from a historic timeline, and there is a timeline switch
2454  * within this segment, read from the WAL segment belonging to the new
2455  * timeline.
2456  *
2457  * For example, imagine that this server is currently on timeline 5, and
2458  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2459  * 0/13002088. In pg_wal, we have these files:
2460  *
2461  * ...
2462  * 000000040000000000000012
2463  * 000000040000000000000013
2464  * 000000050000000000000013
2465  * 000000050000000000000014
2466  * ...
2467  *
2468  * In this situation, when requested to send the WAL from segment 0x13, on
2469  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2470  * recovery prefers files from newer timelines, so if the segment was
2471  * restored from the archive on this server, the file belonging to the old
2472  * timeline, 000000040000000000000013, might not exist. Their contents are
2473  * equal up to the switchpoint, because at a timeline switch, the used
2474  * portion of the old segment is copied to the new file. -------
2475  */
2476  *tli_p = sendTimeLine;
2478  {
2479  XLogSegNo endSegNo;
2480 
2482  if (nextSegNo == endSegNo)
2483  *tli_p = sendTimeLineNextTLI;
2484  }
2485 
2486  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2487  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2488  if (state->seg.ws_file >= 0)
2489  return;
2490 
2491  /*
2492  * If the file is not found, assume it's because the standby asked for a
2493  * too old WAL segment that has already been removed or recycled.
2494  */
2495  if (errno == ENOENT)
2496  {
2497  char xlogfname[MAXFNAMELEN];
2498  int save_errno = errno;
2499 
2500  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2501  errno = save_errno;
2502  ereport(ERROR,
2504  errmsg("requested WAL segment %s has already been removed",
2505  xlogfname)));
2506  }
2507  else
2508  ereport(ERROR,
2510  errmsg("could not open file \"%s\": %m",
2511  path)));
2512 }
int wal_segment_size
Definition: xlog.c:121
#define PG_BINARY
Definition: c.h:1271
WALOpenSegment seg
Definition: xlogreader.h:243
#define ERROR
Definition: elog.h:46
#define MAXPGPATH
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:721
#define MAXFNAMELEN
#define ereport(elevel,...)
Definition: elog.h:157
static TimeLineID sendTimeLine
Definition: walsender.c:145
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1033
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int errmsg(const char *fmt,...)
Definition: elog.c:909
WALSegmentContext segcxt
Definition: xlogreader.h:242
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3210 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

3211 {
3212  WalSnd *walsnd = MyWalSnd;
3213 
3215 
3216  if (walsnd->state == state)
3217  return;
3218 
3219  SpinLockAcquire(&walsnd->mutex);
3220  walsnd->state = state;
3221  SpinLockRelease(&walsnd->mutex);
3222 }
slock_t mutex
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSndState state
#define SpinLockRelease(lock)
Definition: spin.h:64
WalSnd * MyWalSnd
Definition: walsender.c:112
#define Assert(condition)
Definition: c.h:804
Definition: regguts.h:317

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3069 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3070 {
3071  bool found;
3072  int i;
3073 
3074  WalSndCtl = (WalSndCtlData *)
3075  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3076 
3077  if (!found)
3078  {
3079  /* First time through, so initialize */
3081 
3082  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3084 
3085  for (i = 0; i < max_wal_senders; i++)
3086  {
3087  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3088 
3089  SpinLockInit(&walsnd->mutex);
3090  }
3091  }
3092 }
Size WalSndShmemSize(void)
Definition: walsender.c:3057
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3057 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3058 {
3059  Size size = 0;
3060 
3061  size = offsetof(WalSndCtlData, walsnds);
3062  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3063 
3064  return size;
3065 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 229 of file walsender.c.

Referenced by WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndResourceCleanup(), WalSndWaitForWal(), and WalSndWriteData().

263 {
265 
266  /* Create a per-walsender data structure in shared memory */
268 
269  /*
270  * We don't currently need any ResourceOwner in a walsender process, but
271  * if we did, we could call CreateAuxProcessResourceOwner here.
272  */
273 
274  /*
275  * Let postmaster know that we're a WAL sender. Once we've declared us as
276  * a WAL sender process, postmaster will let us outlive the bgwriter and
277  * kill us last in the shutdown sequence, so we get a chance to stream all
278  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279  * there's no going back, and we mustn't write any WAL records after this.
280  */
283 
284  /* Initialize empty timestamp buffer for lag tracking. */
286 }
static void InitWalSenderSlot(void)
Definition: walsender.c:2369
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
bool RecoveryInProgress(void)
Definition: xlog.c:8237
static LagTracker * lag_tracker
Definition: walsender.c:219
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
bool am_cascading_walsender
Definition: walsender.c:116

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3038 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3039 {
3040  /* Set up signal handlers */
3042  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3043  pqsignal(SIGTERM, die); /* request shutdown */
3044  /* SIGQUIT handler was already set up by InitPostmasterChild */
3045  InitializeTimeouts(); /* establishes SIGALRM handler */
3048  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3049  * shutdown */
3050 
3051  /* Reset some signals that are accepted by postmaster but not here */
3053 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3026
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2931
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid 
)
static

Definition at line 1342 of file walsender.c.

References GetCurrentTimestamp(), LagTrackerWrite(), now(), TimestampDifferenceExceeds(), and WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1343 {
1344  static TimestampTz sendTime = 0;
1346 
1347  /*
1348  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1349  * avoid flooding the lag tracker when we commit frequently.
1350  */
1351 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1352  if (!TimestampDifferenceExceeds(sendTime, now,
1354  return;
1355 
1356  LagTrackerWrite(lsn, now);
1357  sendTime = now;
1358 }
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3521
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3129 of file walsender.c.

References WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, ModifyWaitEvent(), proc_exit(), WaitEventSetWait(), and WL_POSTMASTER_DEATH.

Referenced by WalSndLoop(), WalSndWaitForWal(), and WalSndWriteData().

3130 {
3131  WaitEvent event;
3132 
3133  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3134  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3135  (event.events & WL_POSTMASTER_DEATH))
3136  proc_exit(1);
3137 }
#define FeBeWaitSetSocketPos
Definition: libpq.h:54
void proc_exit(int code)
Definition: ipc.c:104
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:948
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
uint32 events
Definition: latch.h:145
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1308

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1368 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), WAIT_EVENT_WAL_SENDER_WAIT_WAL, waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, and XLogBackgroundFlush().

Referenced by logical_read_xlog_page().

1369 {
1370  int wakeEvents;
1371  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1372 
1373  /*
1374  * Fast path to avoid acquiring the spinlock in case we already know we
1375  * have enough WAL available. This is particularly interesting if we're
1376  * far behind.
1377  */
1378  if (RecentFlushPtr != InvalidXLogRecPtr &&
1379  loc <= RecentFlushPtr)
1380  return RecentFlushPtr;
1381 
1382  /* Get a more recent flush pointer. */
1383  if (!RecoveryInProgress())
1384  RecentFlushPtr = GetFlushRecPtr();
1385  else
1386  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1387 
1388  for (;;)
1389  {
1390  long sleeptime;
1391 
1392  /* Clear any already-pending wakeups */
1394 
1396 
1397  /* Process any requests or signals received recently */
1398  if (ConfigReloadPending)
1399  {
1400  ConfigReloadPending = false;
1403  }
1404 
1405  /* Check for input from the client */
1407 
1408  /*
1409  * If we're shutting down, trigger pending WAL to be written out,
1410  * otherwise we'd possibly end up waiting for WAL that never gets
1411  * written, because walwriter has shut down already.
1412  */
1413  if (got_STOPPING)
1415 
1416  /* Update our idea of the currently flushed position. */
1417  if (!RecoveryInProgress())
1418  RecentFlushPtr = GetFlushRecPtr();
1419  else
1420  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1421 
1422  /*
1423  * If postmaster asked us to stop, don't wait anymore.
1424  *
1425  * It's important to do this check after the recomputation of
1426  * RecentFlushPtr, so we can send all remaining data before shutting
1427  * down.
1428  */
1429  if (got_STOPPING)
1430  break;
1431 
1432  /*
1433  * We only send regular messages to the client for full decoded
1434  * transactions, but a synchronous replication and walsender shutdown
1435  * possibly are waiting for a later location. So, before sleeping, we
1436  * send a ping containing the flush location. If the receiver is
1437  * otherwise idle, this keepalive will trigger a reply. Processing the
1438  * reply will update these MyWalSnd locations.
1439  */
1440  if (MyWalSnd->flush < sentPtr &&
1441  MyWalSnd->write < sentPtr &&
1443  WalSndKeepalive(false);
1444 
1445  /* check whether we're done */
1446  if (loc <= RecentFlushPtr)
1447  break;
1448 
1449  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1450  WalSndCaughtUp = true;
1451 
1452  /*
1453  * Try to flush any pending output to the client.
1454  */
1455  if (pq_flush_if_writable() != 0)
1456  WalSndShutdown();
1457 
1458  /*
1459  * If we have received CopyDone from the client, sent CopyDone
1460  * ourselves, and the output buffer is empty, it's time to exit
1461  * streaming, so fail the current WAL fetch request.
1462  */
1464  !pq_is_send_pending())
1465  break;
1466 
1467  /* die if timeout was reached */
1469 
1470  /* Send keepalive if the time has come */
1472 
1473  /*
1474  * Sleep until something happens or we time out. Also wait for the
1475  * socket becoming writable, if there's still pending output.
1476  * Otherwise we might sit on sendable output data while waiting for
1477  * new WAL to be generated. (But if we have nothing to send, we don't
1478  * want to wake on socket-writable.)
1479  */
1481 
1482  wakeEvents = WL_SOCKET_READABLE;
1483 
1484  if (pq_is_send_pending())
1485  wakeEvents |= WL_SOCKET_WRITEABLE;
1486 
1487  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1488  }
1489 
1490  /* reactivate latch so WalSndLoop knows to continue */
1491  SetLatch(MyLatch);
1492  return RecentFlushPtr;
1493 }
#define pq_is_send_pending()
Definition: libpq.h:39
XLogRecPtr write
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3129
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define WL_SOCKET_READABLE
Definition: latch.h:126
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
bool RecoveryInProgress(void)
Definition: xlog.c:8237
void SetLatch(Latch *latch)
Definition: latch.c:567
void ResetLatch(Latch *latch)
Definition: latch.c:660
XLogRecPtr flush
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3460
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11728
bool XLogBackgroundFlush(void)
Definition: xlog.c:3070
static bool streamingDoneSending
Definition: walsender.c:179
#define pq_flush_if_writable()
Definition: libpq.h:38
void SyncRepInitConfig(void)
Definition: syncrep.c:412
static bool WalSndCaughtUp
Definition: walsender.c:183
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3483
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2216
static void WalSndShutdown(void)
Definition: walsender.c:229
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2172
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool streamingDoneReceiving
Definition: walsender.c:180
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
static bool waiting_for_ping_response
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1704

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3172 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3173 {
3174  for (;;)
3175  {
3176  int i;
3177  bool all_stopped = true;
3178 
3179  for (i = 0; i < max_wal_senders; i++)
3180  {
3181  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3182 
3183  SpinLockAcquire(&walsnd->mutex);
3184 
3185  if (walsnd->pid == 0)
3186  {
3187  SpinLockRelease(&walsnd->mutex);
3188  continue;
3189  }
3190 
3191  if (walsnd->state != WALSNDSTATE_STOPPING)
3192  {
3193  all_stopped = false;
3194  SpinLockRelease(&walsnd->mutex);
3195  break;
3196  }
3197  SpinLockRelease(&walsnd->mutex);
3198  }
3199 
3200  /* safe to leave if confirmation is done for all WAL senders */
3201  if (all_stopped)
3202  return;
3203 
3204  pg_usleep(10000L); /* wait for 10 msec */
3205  }
3206 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3101 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3102 {
3103  int i;
3104 
3105  for (i = 0; i < max_wal_senders; i++)
3106  {
3107  Latch *latch;
3108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3109 
3110  /*
3111  * Get latch pointer with spinlock held, for the unlikely case that
3112  * pointer reads aren't atomic (as they're 8 bytes).
3113  */
3114  SpinLockAcquire(&walsnd->mutex);
3115  latch = walsnd->latch;
3116  SpinLockRelease(&walsnd->mutex);
3117 
3118  if (latch != NULL)
3119  SetLatch(latch);
3120  }
3121 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:567
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1258 of file walsender.c.

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, MyLatch, now(), LogicalDecodingContext::out, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), resetStringInfo(), SetLatch(), SyncRepInitConfig(), TimestampTzPlusMilliseconds, WAIT_EVENT_WAL_SENDER_WRITE_DATA, wal_sender_timeout, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

1260 {
1261  TimestampTz now;
1262 
1263  /*
1264  * Fill the send timestamp last, so that it is taken as late as possible.
1265  * This is somewhat ugly, but the protocol is set as it's already used for
1266  * several releases by streaming physical replication.
1267  */
1269  now = GetCurrentTimestamp();
1270  pq_sendint64(&tmpbuf, now);
1271  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1272  tmpbuf.data, sizeof(int64));
1273 
1274  /* output previously gathered data in a CopyData packet */
1275  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1276 
1278 
1279  /* Try to flush pending output to the client */
1280  if (pq_flush_if_writable() != 0)
1281  WalSndShutdown();
1282 
1283  /* Try taking fast path unless we get too close to walsender timeout. */
1285  wal_sender_timeout / 2) &&
1286  !pq_is_send_pending())
1287  {
1288  return;
1289  }
1290 
1291  /* If we have pending write here, go to slow path */
1292  for (;;)
1293  {
1294  long sleeptime;
1295 
1296  /* Check for input from the client */
1298 
1299  /* die if timeout was reached */
1301 
1302  /* Send keepalive if the time has come */
1304 
1305  if (!pq_is_send_pending())
1306  break;
1307 
1309 
1310  /* Sleep until something happens or we time out */
1313 
1314  /* Clear any already-pending wakeups */
1316 
1318 
1319  /* Process any requests or signals received recently */
1320  if (ConfigReloadPending)
1321  {
1322  ConfigReloadPending = false;
1325  }
1326 
1327  /* Try to flush pending output to the client */
1328  if (pq_flush_if_writable() != 0)
1329  WalSndShutdown();
1330  }
1331 
1332  /* reactivate latch so WalSndLoop knows to continue */
1333  SetLatch(MyLatch);
1334 }
#define pq_is_send_pending()
Definition: libpq.h:39
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3129
int wal_sender_timeout
Definition: walsender.c:123
void ProcessConfigFile(GucContext context)
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
#define WL_SOCKET_READABLE
Definition: latch.h:126
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
void SetLatch(Latch *latch)
Definition: latch.c:567
void ResetLatch(Latch *latch)
Definition: latch.c:660
#define pq_flush_if_writable()
Definition: libpq.h:38
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void SyncRepInitConfig(void)
Definition: syncrep.c:412
Definition: guc.h:72
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3483
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2216
static void WalSndShutdown(void)
Definition: walsender.c:229
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2172
static StringInfoData tmpbuf
Definition: walsender.c:159
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:42
StringInfo out
Definition: logical.h:73
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1704
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 2826 of file walsender.c.

References elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::page_read, LogicalDecodingContext::reader, WalSnd::sentPtr, sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, XLogReadRecord(), and XLREAD_NEED_DATA.

Referenced by StartLogicalReplication(), and WalSndLoop().

2827 {
2828  XLogRecord *record;
2829  char *errm;
2830 
2831  /*
2832  * We'll use the current flush point to determine whether we've caught up.
2833  * This variable is static in order to cache it across calls. Caching is
2834  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2835  * spinlock.
2836  */
2837  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2838 
2839  /*
2840  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2841  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2842  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2843  * didn't wait - i.e. when we're shutting down.
2844  */
2845  WalSndCaughtUp = false;
2846 
2847  while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
2849  {
2851  break;
2852  }
2853 
2854  /* xlog record was invalid */
2855  if (errm != NULL)
2856  elog(ERROR, "%s", errm);
2857 
2858  if (record != NULL)
2859  {
2860  /*
2861  * Note the lack of any call to LagTrackerWrite() which is handled by
2862  * WalSndUpdateProgress which is called by output plugin through
2863  * logical decoding write api.
2864  */
2866 
2868  }
2869 
2870  /*
2871  * If first time through in this session, initialize flushPtr. Otherwise,
2872  * we only need to update flushPtr if EndRecPtr is past it.
2873  */
2874  if (flushPtr == InvalidXLogRecPtr)
2875  flushPtr = GetFlushRecPtr();
2876  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2877  flushPtr = GetFlushRecPtr();
2878 
2879  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2880  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2881  WalSndCaughtUp = true;
2882 
2883  /*
2884  * If we're caught up and have been requested to stop, have WalSndLoop()
2885  * terminate the connection in an orderly manner, after writing out all
2886  * the pending data.
2887  */
2889  got_SIGUSR2 = true;
2890 
2891  /* Update shared memory status */
2892  {
2893  WalSnd *walsnd = MyWalSnd;
2894 
2895  SpinLockAcquire(&walsnd->mutex);
2896  walsnd->sentPtr = sentPtr;
2897  SpinLockRelease(&walsnd->mutex);
2898  }
2899 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8589
XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
Definition: xlogreader.c:346
slock_t mutex
XLogRecPtr EndRecPtr
Definition: xlogreader.h:179
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SpinLockRelease(lock)
Definition: spin.h:64
XLogRecPtr sentPtr
WalSnd * MyWalSnd
Definition: walsender.c:112
static XLogRecPtr sentPtr
Definition: walsender.c:154
uint64 XLogRecPtr
Definition: xlogdefs.h:21
XLogReaderState * reader
Definition: logical.h:43
#define elog(elevel,...)
Definition: elog.h:232
LogicalDecodingXLogPageReadCB page_read
Definition: logical.h:44

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 2525 of file walsender.c.

References am_cascading_walsender,